切換語言為:簡體

SpringBoot基於延遲佇列 + 事件釋出機制實現定時任務

  • 爱糖宝
  • 2024-09-26
  • 2049
  • 0
  • 0

本文主要記錄了在springboot中透過事件釋出機制+延遲佇列實現定時任務的需求,主要內容有:

  • 如何使用Spring的事件釋出機制

  • 使用延遲任務實現定時功能

實現的思維導圖如下:

SpringBoot基於延遲佇列 + 事件釋出機制實現定時任務

事件釋出機制

事件釋出機制需要實現三個角色,分別為

  • 事件的釋出者publisher

  • 事件的消費者即listener

  • 通訊的基本單元event

事件釋出機制的通訊單元

Spring框架中、提供了對事件的封裝,實體類只需要繼承ApplicationEvent抽象類即可。程式碼如下:

@Getter
@Setter
public class CustomSpringEvent extends ApplicationEvent {
    private String message;
    public CustomSpringEvent(Object source, String message) {
        super(source);
        this.message = message;
    }
}

可以把CustomSpringEvent理解為這個事件傳輸的基本單位、封裝了我們需要傳遞的資訊。

事件釋出者機制的生產者

事件釋出者:主要職責就是傳送任務的,相當於告訴監聽者、該起床幹活了。具體程式碼如下:

@Component
public class CustomEventPublisher implements ApplicationEventPublisherAware {
    private ApplicationEventPublisher publisher;
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
    // 釋出事件
    public void publish(String message) {
        CustomSpringEvent event = new CustomSpringEvent(this, message);
        publisher.publishEvent(event);
    }
}

當然也可以直接自動注入private ApplicationEventPublisher publisher;透過呼叫介面的publishEvent()方法進行事件釋出。

事件釋出機制的消費者

監聽者的實現如下:

@Slf4j
@Component
public class CustomEventListener implements ApplicationListener<CustomSpringEvent> {
    @Override
    public void onApplicationEvent(CustomSpringEvent event) {
        log.info("收到事件:{}", event.getMessage());
    }
}

在測試包下寫下如下程式碼進行程式碼測試:

@SpringBootTest
public class EventTest {

    @Autowired
    private CustomEventPublisher eventPublisher;
    @Test
    public void testEvent() {
        eventPublisher.publish("hello world");
    }
}

可以看到執行結果符合預期。事件成功釋出和消費。

SpringBoot基於延遲佇列 + 事件釋出機制實現定時任務

延遲任務

延遲任務、在JUC包下提供了Delayed介面、該介面實現類Comparable介面、因此需要實現如下兩個方法:

  • getDelay():用於計算事件發生距離當前還有多久。

  • compareTo():用於比較不同延遲任務之間的先後順序。

除此之外、爲了儲存多個定時任務、我們需要一個容器即延遲佇列DeleayQueue、延遲佇列DelayQueue實現了BlockingQueue。關於BlockQueue的介紹請參看此文。延遲佇列的會根據事件發生的先後順序進行排列,時間到了就可以呼叫take()方法取出消費,否則就阻塞在此。

因此需要改造一下之前的事件類。在原始碼的基礎上修改程式碼如下:

    @Getter
    @Setter
    public class CustomSpringEvent extends ApplicationEvent  implements Delayed {
   
        private String message;
        public CustomSpringEvent(Object source, String message,long millis) {
            // 用於計算任務距離當前的時間差
            // millis:距離當前系統時間多少毫秒之後執行。
            super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofMillis(millis)));
            this.message = message;
        }
   
        @Override
        public long getDelay(TimeUnit unit) {
            long millis = this.getTimestamp();
            long currentTime = System.currentTimeMillis();
            long duration = millis - currentTime;
            return unit.convert(duration, unit);
        }
   
        @Override
        public int compareTo(Delayed o) {
            long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
            return (delta < 0 ? -1 : (delta > 0 ? 1 : 0));
        }
    }

此時需要重新捋一下事件的釋出過程了、不再是簡單的釋出消費了。

  • 先入隊儲存延時任務。

  • 等任務可以取出了在釋出。

因此修改釋出程式碼如下:

@Component
@Slf4j
public class CustomEventPublisher implements ApplicationEventPublisherAware {
    private ApplicationEventPublisher publisher;
    // 延遲任務佇列容器
    private DelayQueue<CustomSpringEvent> delayQueue = new DelayQueue<>();
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
    // 延遲任務入隊
    public void publish(CustomSpringEvent event) {
        boolean offer = this.delayQueue.offer(event);
        if (offer) {
            log.info("延遲任務 [{}] 已加入佇列",event.getMessage());
        }
    }

    // 延遲任務出隊併發布
    private void publishDelayTask() {
        while (true) {
            try {
                CustomSpringEvent event = this.delayQueue.take();
                this.publisher.publishEvent(event);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    @PostConstruct
    public void start() {
        new Thread(() -> publishDelayTask()).start();
    }
}

編寫測試程式碼如下:

@Test
public void testEvent() {

    CustomSpringEvent task1 = new CustomSpringEvent(this, "5s後執行", 5000);
    CustomSpringEvent task2 = new CustomSpringEvent(this, "10s後執行", 10000);
    CustomSpringEvent task3 = new CustomSpringEvent(this, "20s後執行", 20000);

    eventPublisher.publish(task1);
    eventPublisher.publish(task2);
    eventPublisher.publish(task3);

    try {
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

執行結果如下:

SpringBoot基於延遲佇列 + 事件釋出機制實現定時任務

可以看到結果符合預期、沒有任何毛病!

最佳化點

  1. 透過執行緒池來來執行任務。配置執行緒池:

@Configuration
@EnableAsync
public class ThreadPoolConfiguration {

    @Bean("listenerExecutor")
    public ThreadPoolExecutor poolExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                10, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        return threadPoolExecutor;
    }
    @Bean("publisherExecutor")
    public ThreadPoolExecutor publisherExecutor() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                10, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
        return threadPoolExecutor;
    }
}

  1. 關於監聽器的實現可以透過註解來實現

@Slf4j
@Component
@EnableAsync
public class CustomEventListener {
    @EventListener
    @Async("listenerExecutor")
    public void onCustomEvent(CustomSpringEvent event) {
        log.info("Received spring custom event - {}", event.getMessage());
    }
}

  1. 釋出者也可以不實現介面的方式來實現

@Component
@Slf4j
public class EventPublisher {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;;

    public void publish(CustomSpringEvent event)
    {
        log.info("publish event:{}",event);
        applicationEventPublisher.publishEvent(event);
    }

}

  1. 關於任務的釋出這一個也可以交給執行緒池實現

@Autowired
@Qualifier("publisherExecutor")
private ThreadPoolExecutor publisherThreadPool;

@PostConstruct
public void start() {
  //new Thread(() -> publishDelayTask()).start();
  publisherThreadPool.execute(() -> publishDelayTask());
}

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.