RabbitMQ 是一個強大的訊息佇列系統,它提供了多種佇列型別以滿足不同的使用需求。本文將探討三種主要佇列型別:經典佇列、仲裁佇列和流式佇列,並討論它們的區別和選型建議。
經典佇列(Classic Queues)
簡介:
經典佇列是 RabbitMQ 中最早期也是最常用的一種佇列型別。它們具有良好的效能和穩定性,適合大多數常規的訊息傳遞場景。
特點:
儲存機制:訊息儲存在磁碟或記憶體中,支援持久化。
訊息傳遞:一旦訊息被消費者確認,訊息會從佇列中刪除。
效能:效能相對較高,但在高併發和大訊息量場景下,可能會遇到瓶頸。
高可用性:支援映象佇列,實現高可用性。映象佇列中的訊息會複製到多個節點,以防單節點故障。
適用場景:
適合大多數常規訊息傳遞場景,如任務排程、事件通知等。
當需要訊息的持久化儲存和高可靠性時,經典佇列是一個很好的選擇。
仲裁佇列(Quorum Queues)
簡介:
仲裁佇列是一種基於 Raft 協議實現的新型佇列,專為提高資料一致性和可靠性而設計。
特點:
儲存機制:訊息儲存在多個節點上,採用 Raft 協議確保資料一致性。
高可用性:天然支援高可用性,透過多節點複製實現資料冗餘。
資料一致性:仲裁佇列確保每條訊息在多個副本之間的一致性,避免單點故障導致的資料丟失。
效能:由於需要確保資料一致性,效能可能比經典佇列略低,適合對資料一致性要求較高的場景。
適用場景:
適用於對資料一致性和高可用性要求較高的場景,如金融交易、訂單處理等關鍵業務系統。
在需要確保訊息不丟失且需要高可用性的情況下,仲裁佇列是一個理想選擇。
注意事項
仲裁佇列只能宣告為持久的
仲裁佇列只能被宣告為持久的,否則會引發以下錯誤訊息: :server_initiated_close,406,“PRECONDITION_FAILED - 佇列‘my-quorum-queue’的屬性‘non-durable’無效
Quorum 佇列具有一些特殊功能和限制。它們不能是非持久的,因為 Raft 日誌始終寫入磁碟,因此它們永遠不能被宣告為瞬態的。從 3.8.2 開始,它們也不支援訊息 TTL 和訊息優先順序 2。
仲裁佇列的訊息預設就是持久化的
對mq熟悉的老鐵應該知道,佇列的持久化和訊息的持久化是分開的。一般情況下如果不對訊息宣告為持久化的,服務重啟之後訊息就會丟失。但是仲裁佇列預設訊息就是持久化的。
下面我手擼了一個簡單的demo,同時給經典佇列和仲裁佇列各發送一條訊息。
然後我們重啟服務,發現經典佇列的訊息已經丟失了,但是仲裁佇列的訊息還在佇列中。
仲裁佇列 VS 經典佇列
資料一致性
仲裁佇列使用 Raft 共識演算法來確保資料的一致性。即使在單節點情況下,仲裁佇列也會嚴格遵循日誌記錄和確認機制,確保訊息的順序和一致性。而經典佇列在單節點情況下可能會因節點故障導致資料丟失或不一致。
資料可靠性
仲裁佇列會將每條訊息記錄到持久儲存中,確保即使在系統崩潰後,訊息也不會丟失。經典佇列雖然也支援訊息持久化,但其可靠性依賴於訊息寫入磁碟的速度和節點的穩定性。
流式佇列(Stream Queues)
流式佇列是一種新型佇列,專為處理大規模數據流和高吞吐量場景設計。
特點:
儲存機制:訊息以流的形式儲存,可以實現訊息的回放和重複消費。
高吞吐量:設計上最佳化了高吞吐量和低延遲,適合處理大規模實時數據流。
資料永續性:訊息可以持久化儲存,支援長時間的訊息保留和回溯。
訂閱機制:支援多種訂閱模式,允許多個消費者按需訂閱訊息流。
什麼是訊息回放和重複消費?
訊息回放:允許消費者在任何時間點重新讀取過去的訊息。這對於需要重現歷史事件或進行審計的應用程式特別有用。
重複消費:消費者可以多次消費同一條訊息,這在除錯和處理異常時尤為重要。
下面我們透過一個簡單的例子看看重複消費
public void InitStreamMQ() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "user", Password = "myrabbit" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); // 宣告流式佇列 var args = new Dictionary<string, object> { { "x-queue-type", "stream" } }; channel.QueueDeclare(queue: "stream_queue", durable: true, exclusive: false, autoDelete: false, arguments: args); channel.QueueBind(queue: "stream_queue", exchange: "amq.direct", routingKey: "stream_queue"); } [ActionTitle(Name = "訂閱佇列")] [Route("subscribe")] public void SubscribeQuorumMessage() { var factory = new ConnectionFactory() { HostName = "localhost", VirtualHost = "/", UserName = "user", Password = "myrabbit" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 設定消費者,從指定的偏移量消費訊息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; /** * x-stream-offset的可選值有以下幾種: first: 從日誌佇列中第一個可消費的訊息開始消費 last: 消費訊息日誌中最後一個訊息 next: 相當於不指定offset,消費不到訊息。 Offset: 一個數字型的偏移量 Timestamp:一個代表時間的Data型別變數,表示從這個時間點開始消費。例如 一個小時前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000) */ var args = new Dictionary<string, object> { { "x-stream-offset", 2 } }; channel.BasicConsume(queue: "stream_queue", autoAck: false, arguments: args, consumer: consumer); }
這裏我們往流式佇列裡面傳送了10條訊息但是每次消費的時候都從第3條訊息(offset為2)開始消費.
流式佇列的工作原理
流式佇列的工作方式類似於日誌系統(如 Apache Kafka)。訊息按照順序追加到佇列的末尾,並儲存在磁碟上。每個訊息都有一個唯一的偏移量(offset),消費者可以透過指定偏移量來讀取特定的訊息或重新消費訊息。
適用場景:
適用於實時資料分析、日誌處理、實時監控等場景。
在需要處理大規模數據流和高吞吐量的場景下,流式佇列是一個合適的選擇。
PS
Auto-Ack 必須為 false
在流式佇列中,爲了確保訊息的可靠性和能夠實現訊息回放,自動確認(autoAck)必須設定為 false。自動確認會導致訊息一旦被消費即刻從佇列中移除,失去訊息的永續性和回放功能。
必須設定prefetchCount
流式佇列(Stream Queue)在 RabbitMQ 中主要設計用於高吞吐量和低延遲的訊息傳輸。設定 prefetchCount(每次傳送給消費者的訊息數量)是爲了最佳化流式佇列的效能和資源使用
durable必須設定為true
與Quorum佇列類似, Stream佇列的durable引數必須宣告為true,exclusive引數必須宣告為false。這其中,x-max-length-bytes 表示日誌檔案的最大位元組數。x-stream-max-segment-size-bytes 每一個日誌檔案的最大大小。這兩個是可選引數,通常爲了防止stream日誌無限制累計,都會配合stream佇列一起宣告。
選型建議
在選擇 RabbitMQ 佇列型別時,需要根據具體的業務需求和場景來決定。以下是一些選型建議:
經典佇列:
- 適合大多數常規的訊息傳遞需求。
- 需要較高的效能和可靠性,但不需要特別高的資料一致性要求。
仲裁佇列:
- 適用於對資料一致性和高可用性要求較高的場景。
- 需要確保訊息不丟失且能夠在多節點間保持資料一致性。
流式佇列:
- 適合處理大規模實時數據流和高吞吐量的場景。
- 需要訊息的回放和重複消費功能,適用於實時資料分析和日誌處理等場景。
總結
透過瞭解經典佇列、仲裁佇列和流式佇列的特點和應用場景,能夠更好地選擇適合自己業務需求的佇列型別。在實際應用中,可以根據具體的業務需求和效能要求,靈活地選擇和配置 RabbitMQ 佇列,以實現最佳的訊息傳遞效果。