切換語言為:簡體

RocketMQ 消費訊息的Push模式,是推還是拉?

  • 爱糖宝
  • 2024-06-21
  • 2079
  • 0
  • 0

RocketMQ 消費訊息是推還是拉?

RocketMQ 是一個分散式訊息中介軟體,廣泛應用於高效能、高可靠性的訊息傳輸場景。對於很多開發者來說,理解 RocketMQ 消費訊息的機制是至關重要的。一個常見的問題是:RocketMQ 的訊息消費是推(Push)還是拉(Pull)?

消費訊息的基本模式

在訊息中介軟體的設計中,訊息消費通常有兩種模式:

  1. 推模式(Push):訊息中介軟體主動將訊息推送給消費者。

  2. 拉模式(Pull):消費者主動從訊息中介軟體拉取訊息。

這兩種模式各有優缺點:

  • 推模式的優點是訊息可以立即送達消費者,缺點是如果消費者處理能力不足,可能會導致訊息堆積或丟失。

  • 拉模式的優點是消費者可以根據自身的處理能力主動控制訊息的拉取速度,缺點是可能會增加延遲。

RocketMQ 的消費機制

RocketMQ 的訊息消費機制是一種混合模式,表面上看是推模式,但實際上是基於拉模式實現的

+-------------------+
| 1. 消費者初始化   |
+-------------------+
        |
        v
+-------------------+
| 建立消費者例項   |
| 設定 NameServer  |
| 訂閱主題         |
| 註冊監聽器       |
+-------------------+
        |
        v
+-----------------------+
| 2. 內部拉取執行緒       |
+-----------------------+
        |
        v
+-----------------------+
| 啟動多個拉取執行緒     |
| 每個執行緒執行         |
| PullMessageService   |
+-----------------------+
        |
        v
+-----------------------+
| 3. 訊息拉取           |
+-----------------------+
        |
        v
+--------------------------+
| 從佇列中取出 PullRequest |
+--------------------------+
        |
        v
+--------------------------+
| 呼叫 consumerImpl.pull  |
| Message 拉取訊息         |
+--------------------------+
        |
        v
+--------------------------+
| 如果找到訊息,將訊息     |
| 放入 ConsumeMessageService |
| 佇列                     |
+--------------------------+
        |
        v
+-----------------------+
| 4. 訊息推送給監聽器   |
+-----------------------+
        |
        v
+-------------------------+
| ConsumeMessageService  |
| 從佇列中處理訊息       |
+-------------------------+
        |
        v
+-----------------------------+
| 呼叫註冊的 MessageListener |
| Concurrently               |
+-----------------------------+
        |
        v
+-----------------------+
| 監聽器處理訊息       |
+-----------------------+


1. 消費者啟動並註冊監聽器

當消費者啟動時,使用者會建立 DefaultMQPushConsumer 例項,並註冊一個 MessageListenerConcurrently 監聽器。程式碼示例如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("Receive message: %s%n", new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();
System.out.printf("Consumer started.%n");


2. 內部拉取執行緒

消費者內部啟動多個拉取執行緒,這些執行緒不斷從 Broker 拉取訊息。這個過程由 PullMessageService 服務實現:

public class PullMessageService extends ServiceThread {
    private final BlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<>();
    private final DefaultMQPushConsumerImpl consumerImpl;

    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException e) {
                log.error("PullMessageService interrupted", e);
            }
        }
    }

    private void pullMessage(PullRequest pullRequest) {
        // 呼叫拉取訊息的方法
        this.consumerImpl.pullMessage(pullRequest);
    }
}


3. 訊息推送給監聽器

拉取到的訊息會被放入 ConsumeMessageService 的阻塞佇列中,然後由消費者的 ConsumeMessageService 服務處理。ConsumeMessageService 會從阻塞佇列中取出訊息,並呼叫使用者註冊的監聽器進行處理:

public void pullMessage(PullRequest pullRequest) {
    // 拉取訊息
    PullResult pullResult = this.pullKernelImpl.pull(pullRequest);
    switch (pullResult.getPullStatus()) {
        case FOUND:
            // 將拉取到的訊息放入阻塞佇列
            this.consumeMessageService.submitConsumeRequest(
                pullResult.getMsgFoundList(),
                pullRequest.getProcessQueue(),
                pullRequest.getMessageQueue(),
                false
            );
            break;
        // 其他狀態處理
    }
}


ConsumeMessageService 會從阻塞佇列中取出訊息,並呼叫註冊的監聽器:

public class ConsumeMessageConcurrentlyService extends ConsumeMessageService {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
                // 從阻塞佇列中取出訊息
                ConsumeRequest consumeRequest = this.consumeRequestQueue.take();
                // 呼叫註冊的監聽器進行處理
                this.consumeMessage(consumeRequest);
            } catch (InterruptedException e) {
                log.error("ConsumeMessageService interrupted", e);
            }
        }
    }

    private void consumeMessage(ConsumeRequest consumeRequest) {
        List<MessageExt> msgs = consumeRequest.getMsgs();
        ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(consumeRequest.getMessageQueue());

        ConsumeConcurrentlyStatus status = this.messageListener.consumeMessage(msgs, context);
        // 根據返回的狀態進行相應的處理
    }
}

總結

RocketMQ 的消費機制實際上是一種混合模式,內部採用拉取模式實現訊息的獲取,然後再將訊息推送給使用者註冊的監聽器處理。這種設計不僅保證了訊息的及時處理,還簡化了使用者的使用體驗。

所以,RocketMQ 消費訊息表面上是推模式(Push),但實際上是基於拉模式(Pull)實現的。這種混合模式結合了兩種模式的優點,使得 RocketMQ 在訊息消費方面具有很高的靈活性和效能。

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.