本文主要记录了在springboot中通过事件发布机制+延迟队列实现定时任务的需求,主要内容有:
如何使用Spring的事件发布机制
使用延迟任务实现定时功能
实现的思维导图如下:
事件发布机制
事件发布机制需要实现三个角色,分别为
事件的发布者
publisher
事件的消费者即
listener
通信的基本单元
event
事件发布机制的通信单元
在Spring
框架中、提供了对事件的封装,实体类只需要继承ApplicationEvent
抽象类即可。代码如下:
@Getter @Setter public class CustomSpringEvent extends ApplicationEvent { private String message; public CustomSpringEvent(Object source, String message) { super(source); this.message = message; } }
可以把CustomSpringEvent
理解为这个事件传输的基本单位、封装了我们需要传递的信息。
事件发布者机制的生产者
事件发布者:主要职责就是发送任务的,相当于告诉监听者、该起床干活了。具体代码如下:
@Component public class CustomEventPublisher implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } // 发布事件 public void publish(String message) { CustomSpringEvent event = new CustomSpringEvent(this, message); publisher.publishEvent(event); } }
当然也可以直接自动注入private ApplicationEventPublisher publisher;
通过调用接口的publishEvent()
方法进行事件发布。
事件发布机制的消费者
监听者的实现如下:
@Slf4j @Component public class CustomEventListener implements ApplicationListener<CustomSpringEvent> { @Override public void onApplicationEvent(CustomSpringEvent event) { log.info("收到事件:{}", event.getMessage()); } }
在测试包下写下如下代码进行代码测试:
@SpringBootTest public class EventTest { @Autowired private CustomEventPublisher eventPublisher; @Test public void testEvent() { eventPublisher.publish("hello world"); } }
可以看到运行结果符合预期。事件成功发布和消费。
延迟任务
延迟任务、在JUC
包下提供了Delayed
接口、该接口实现类Comparable
接口、因此需要实现如下两个方法:
getDelay()
:用于计算事件发生距离当前还有多久。
compareTo()
:用于比较不同延迟任务之间的先后顺序。
除此之外、为了保存多个定时任务、我们需要一个容器即延迟队列DeleayQueue
、延迟队列DelayQueue
实现了BlockingQueue
。关于BlockQueue
的介绍请参看此文。延迟队列的会根据事件发生的先后顺序进行排列,时间到了就可以调用take()
方法取出消费,否则就阻塞在此。
因此需要改造一下之前的事件类。在原代码的基础上修改代码如下:
@Getter @Setter public class CustomSpringEvent extends ApplicationEvent implements Delayed { private String message; public CustomSpringEvent(Object source, String message,long millis) { // 用于计算任务距离当前的时间差 // millis:距离当前系统时间多少毫秒之后执行。 super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofMillis(millis))); this.message = message; } @Override public long getDelay(TimeUnit unit) { long millis = this.getTimestamp(); long currentTime = System.currentTimeMillis(); long duration = millis - currentTime; return unit.convert(duration, unit); } @Override public int compareTo(Delayed o) { long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return (delta < 0 ? -1 : (delta > 0 ? 1 : 0)); } }
此时需要重新捋一下事件的发布过程了、不再是简单的发布消费了。
先入队保存延时任务。
等任务可以取出了在发布。
因此修改发布代码如下:
@Component @Slf4j public class CustomEventPublisher implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; // 延迟任务队列容器 private DelayQueue<CustomSpringEvent> delayQueue = new DelayQueue<>(); @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } // 延迟任务入队 public void publish(CustomSpringEvent event) { boolean offer = this.delayQueue.offer(event); if (offer) { log.info("延迟任务 [{}] 已加入队列",event.getMessage()); } } // 延迟任务出队并发布 private void publishDelayTask() { while (true) { try { CustomSpringEvent event = this.delayQueue.take(); this.publisher.publishEvent(event); } catch (InterruptedException e) { e.printStackTrace(); } } } @PostConstruct public void start() { new Thread(() -> publishDelayTask()).start(); } }
编写测试代码如下:
@Test public void testEvent() { CustomSpringEvent task1 = new CustomSpringEvent(this, "5s后执行", 5000); CustomSpringEvent task2 = new CustomSpringEvent(this, "10s后执行", 10000); CustomSpringEvent task3 = new CustomSpringEvent(this, "20s后执行", 20000); eventPublisher.publish(task1); eventPublisher.publish(task2); eventPublisher.publish(task3); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } }
运行结果如下:
可以看到结果符合预期、没有任何毛病!
优化点
通过线程池来来执行任务。配置线程池:
@Configuration @EnableAsync public class ThreadPoolConfiguration { @Bean("listenerExecutor") public ThreadPoolExecutor poolExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); return threadPoolExecutor; } @Bean("publisherExecutor") public ThreadPoolExecutor publisherExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); return threadPoolExecutor; } }
关于监听器的实现可以通过注解来实现
@Slf4j @Component @EnableAsync public class CustomEventListener { @EventListener @Async("listenerExecutor") public void onCustomEvent(CustomSpringEvent event) { log.info("Received spring custom event - {}", event.getMessage()); } }
发布者也可以不实现接口的方式来实现
@Component @Slf4j public class EventPublisher { @Autowired private ApplicationEventPublisher applicationEventPublisher;; public void publish(CustomSpringEvent event) { log.info("publish event:{}",event); applicationEventPublisher.publishEvent(event); } }
关于任务的发布这一个也可以交给线程池实现
@Autowired @Qualifier("publisherExecutor") private ThreadPoolExecutor publisherThreadPool; @PostConstruct public void start() { //new Thread(() -> publishDelayTask()).start(); publisherThreadPool.execute(() -> publishDelayTask()); }