本文主要記錄了在springboot中透過事件釋出機制+延遲佇列實現定時任務的需求,主要內容有:
如何使用Spring的事件釋出機制
使用延遲任務實現定時功能
實現的思維導圖如下:
事件釋出機制
事件釋出機制需要實現三個角色,分別為
事件的釋出者
publisher
事件的消費者即
listener
通訊的基本單元
event
事件釋出機制的通訊單元
在Spring
框架中、提供了對事件的封裝,實體類只需要繼承ApplicationEvent
抽象類即可。程式碼如下:
@Getter @Setter public class CustomSpringEvent extends ApplicationEvent { private String message; public CustomSpringEvent(Object source, String message) { super(source); this.message = message; } }
可以把CustomSpringEvent
理解為這個事件傳輸的基本單位、封裝了我們需要傳遞的資訊。
事件釋出者機制的生產者
事件釋出者:主要職責就是傳送任務的,相當於告訴監聽者、該起床幹活了。具體程式碼如下:
@Component public class CustomEventPublisher implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } // 釋出事件 public void publish(String message) { CustomSpringEvent event = new CustomSpringEvent(this, message); publisher.publishEvent(event); } }
當然也可以直接自動注入private ApplicationEventPublisher publisher;
透過呼叫介面的publishEvent()
方法進行事件釋出。
事件釋出機制的消費者
監聽者的實現如下:
@Slf4j @Component public class CustomEventListener implements ApplicationListener<CustomSpringEvent> { @Override public void onApplicationEvent(CustomSpringEvent event) { log.info("收到事件:{}", event.getMessage()); } }
在測試包下寫下如下程式碼進行程式碼測試:
@SpringBootTest public class EventTest { @Autowired private CustomEventPublisher eventPublisher; @Test public void testEvent() { eventPublisher.publish("hello world"); } }
可以看到執行結果符合預期。事件成功釋出和消費。
延遲任務
延遲任務、在JUC
包下提供了Delayed
介面、該介面實現類Comparable
介面、因此需要實現如下兩個方法:
getDelay()
:用於計算事件發生距離當前還有多久。
compareTo()
:用於比較不同延遲任務之間的先後順序。
除此之外、爲了儲存多個定時任務、我們需要一個容器即延遲佇列DeleayQueue
、延遲佇列DelayQueue
實現了BlockingQueue
。關於BlockQueue
的介紹請參看此文。延遲佇列的會根據事件發生的先後順序進行排列,時間到了就可以呼叫take()
方法取出消費,否則就阻塞在此。
因此需要改造一下之前的事件類。在原始碼的基礎上修改程式碼如下:
@Getter @Setter public class CustomSpringEvent extends ApplicationEvent implements Delayed { private String message; public CustomSpringEvent(Object source, String message,long millis) { // 用於計算任務距離當前的時間差 // millis:距離當前系統時間多少毫秒之後執行。 super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofMillis(millis))); this.message = message; } @Override public long getDelay(TimeUnit unit) { long millis = this.getTimestamp(); long currentTime = System.currentTimeMillis(); long duration = millis - currentTime; return unit.convert(duration, unit); } @Override public int compareTo(Delayed o) { long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return (delta < 0 ? -1 : (delta > 0 ? 1 : 0)); } }
此時需要重新捋一下事件的釋出過程了、不再是簡單的釋出消費了。
先入隊儲存延時任務。
等任務可以取出了在釋出。
因此修改釋出程式碼如下:
@Component @Slf4j public class CustomEventPublisher implements ApplicationEventPublisherAware { private ApplicationEventPublisher publisher; // 延遲任務佇列容器 private DelayQueue<CustomSpringEvent> delayQueue = new DelayQueue<>(); @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.publisher = applicationEventPublisher; } // 延遲任務入隊 public void publish(CustomSpringEvent event) { boolean offer = this.delayQueue.offer(event); if (offer) { log.info("延遲任務 [{}] 已加入佇列",event.getMessage()); } } // 延遲任務出隊併發布 private void publishDelayTask() { while (true) { try { CustomSpringEvent event = this.delayQueue.take(); this.publisher.publishEvent(event); } catch (InterruptedException e) { e.printStackTrace(); } } } @PostConstruct public void start() { new Thread(() -> publishDelayTask()).start(); } }
編寫測試程式碼如下:
@Test public void testEvent() { CustomSpringEvent task1 = new CustomSpringEvent(this, "5s後執行", 5000); CustomSpringEvent task2 = new CustomSpringEvent(this, "10s後執行", 10000); CustomSpringEvent task3 = new CustomSpringEvent(this, "20s後執行", 20000); eventPublisher.publish(task1); eventPublisher.publish(task2); eventPublisher.publish(task3); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } }
執行結果如下:
可以看到結果符合預期、沒有任何毛病!
最佳化點
透過執行緒池來來執行任務。配置執行緒池:
@Configuration @EnableAsync public class ThreadPoolConfiguration { @Bean("listenerExecutor") public ThreadPoolExecutor poolExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); return threadPoolExecutor; } @Bean("publisherExecutor") public ThreadPoolExecutor publisherExecutor() { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); return threadPoolExecutor; } }
關於監聽器的實現可以透過註解來實現
@Slf4j @Component @EnableAsync public class CustomEventListener { @EventListener @Async("listenerExecutor") public void onCustomEvent(CustomSpringEvent event) { log.info("Received spring custom event - {}", event.getMessage()); } }
釋出者也可以不實現介面的方式來實現
@Component @Slf4j public class EventPublisher { @Autowired private ApplicationEventPublisher applicationEventPublisher;; public void publish(CustomSpringEvent event) { log.info("publish event:{}",event); applicationEventPublisher.publishEvent(event); } }
關於任務的釋出這一個也可以交給執行緒池實現
@Autowired @Qualifier("publisherExecutor") private ThreadPoolExecutor publisherThreadPool; @PostConstruct public void start() { //new Thread(() -> publishDelayTask()).start(); publisherThreadPool.execute(() -> publishDelayTask()); }