我們使用 Kafka 的時候,怎樣能保證不丟失訊息呢?今天來聊一聊這個話題。
首先我們看一下 Kafka 的架構圖,
場景一:非同步傳送
Producer 非同步傳送是丟失訊息比較多的場景,Kafka 非同步傳送的程式碼如下:
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); RecordMetadata metadata = producer.send(record).get();
Producer 傳送訊息後,不用等待發送結果,就可以繼續執行後面的邏輯。如果傳送失敗,就會丟失訊息。
Kafka 提供了回撥方法,可以同步等待發送結果,這樣降低了傳送效率,但可以對傳送失敗的場景進行處理,比如重新發送。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(record, (Callback) (metadata, e) -> { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
場景二:配置 acks=0
從文章開頭的架構圖中可以看到,Broker Leader 節點收到訊息後,會同步給 Follower 節點。
在 Producer 端有一個 acks 配置,說明如下 :
acks=0:Producer 傳送訊息後不等待 Broker 的響應;
acks=1:Producer 傳送訊息後,Leader 節點寫入訊息成功後給 Producer 傳送響應;
acks=all/-1:Producer 傳送訊息後,需要 ISR 列表中所有 Broker 節點都寫入訊息成功纔會給 Producer 傳送響應。
注意:acks=all/-1 是最高安全級別,可以配合 min.insync.replicas 引數使用,當 acks=all/-1 時,min.insync.replicas 表示 ISR 列表中最小寫入訊息成功的副本數。
如下圖,cks=all/-1,當 min.insync.replicas=2 時,
如果 ISR 列表中有【Broker0、Broker1】,即使 Broker2 寫入訊息失敗,也會給 Producer 返回成功。
如果 ISR 列表中只有【Broker0】,則無論如何都不會給 Producer 返回成功。
如果 ISR 列表中有【Broker0、Broker1、Broker2】,則 3 個 Broker 都寫成功纔會給 Producer 返回成功。
場景三:傳送端重試
如果配置 retries=0,Producer 傳送訊息失敗後是不會進行重試的,要保證訊息不丟失,可以增加 retries 的配置值,避免因為網路抖動而造成的傳送失敗。
場景四:Follower 落後太多
Kafka Broker 有一個引數:unclean.leader.election.enable,這個引數值說明如下:
true:允許 ISR 列表之外的節點參與競選 Leader;
false:不允許 ISR 列表之外的節點參與競選 Leader。
如果設定為 true,也是會丟失訊息的,看下圖:
如果 Leader 和 Follower1 都掛了,這時就要考慮是否讓 Follower2 參加競選,把 unclean.leader.election.enable 引數值設定為 true,則 Follower2 也可以競選 Leader,並且作為唯一存活節點成功競選為 Leader,但是它並沒有同步到偏移量為 3、4、5 的訊息,
而之前的 Leader 上線後,成爲了 Follower,因為 Follower 的 LEO(Log End Offset)不能大於 Leader,所以之前偏移量為 3、4、5 的訊息就被丟棄了。如下圖:
所以,要保證訊息不丟失,unclean.leader.election.enable 這個引數值要設定為 false。
場景五:Broker 宕機
爲了提升效能,Kafka 使用 Page Cache,先將訊息寫入 Page Cache,採用了非同步刷盤機制去把訊息儲存到磁碟。如果刷盤之前,Broker Leader 節點宕機了,並且沒有 Follower 節點可以切換成 Leader,則 Leader 重啟後這部分未刷盤的訊息就會丟失。
這種場景下多設定副本數是一個好的選擇,通常的做法是設定 replication.factor >= 3,這樣每個 Partition 就會有 3 個以上 Broker 副本來儲存訊息,同時宕機的機率很低。
同時可以配合場景二中的引數 min.insync.replicas > 1(不建議使用預設值 1),表示訊息至少要被成功寫入到 2 個 Broker 副本纔算是傳送成功。
注意:引數配置要保證 replication.factor > min.insync.replicas,通常設定成 replication.factor = min.insync.replicas + 1。如果這 2 個引數設定成相等,則只要有一個 Broker 節點宕機,Broker 就無法給 Producer 返回傳送成功,系統可用性降低。
場景六:併發消費
如果消費端採用多執行緒併發消費,很容易因為併發更新 Offset 導致消費失敗。看下圖:
執行緒 1 拉取 3 條訊息把 Offset 更新成 3,執行緒 2 把 Offset 更新成 6,執行緒 3 把 Offset 更新成 9。這時如果執行緒 2 消費失敗了,想要重新消費,但是 Offset 已經更新到了 9,不能拉取到 Offset 9 以前的訊息了。
所以,消費者併發消費很可能會造成訊息丟失,如果對訊息丟失很敏感,最好使用單執行緒來進行消費。
如果採用多執行緒,可以把 enable.auto.commit 設定為 false,這樣相當於每次消費完後手動更新 Offset。不過這又會帶來重複消費問題,比如上面的例子,如果執行緒 2 消費失敗了,則手動把 Offset 更新成 3,執行緒 3 消費成功後,再次拉取,還會拉取到 6、7、8 這三條資料。因此消費端需要做好冪等處理。
總結
本文介紹了 Kafka 丟失訊息的六個場景,使用 Kafka 時需要根據實際情況制定解決方案,希望本文介紹的場能夠對你有所幫助。