Kafka 以其高吞吐量、低延遲和可擴充套件性而備受青睞。無論是在實時資料分析、日誌收集還是事件驅動架構中,Kafka 都扮演著關鍵角色。
但是,如果 Kafka 使用不當,也可能會面臨效能瓶頸,影響系統的整體效率。所以,瞭解如何提升 Kafka 的執行效率?對於生產環境的使用和麪試都是至關重要的。
那麼,提升 Kafka 效能的有效手段都有哪些呢?接下來,我們一起來看。
效能調優主要手段
Kafka 效能調優的主要手段有以下幾個:
分割槽擴充套件
訊息批發送(重要)
訊息批獲取(重要)
配置調優
JVM 調優
1.分割槽擴充套件
在 Kafka 架構中,使用多分割槽(Partition)來實現資料分片功能。也就是 Kafka 會將多條訊息併發儲存到一個主題(Topic)的多個 Broker(Kafka 服務)中的多個 Partition 中,以實現並行操作的功能,極大地提高了整體系統的讀寫能力,如下圖所示:
資料分片是一種技術將大資料分割成更小、更易於管理的片段(稱為“分片”),並將分片都儲存在不同的伺服器上,從而實現了資料的水平拆分。透過資料分片,可以有效地解決單一資料庫的效能瓶頸、儲存限制以及高可用性等問題。
因此,增加更多的 Broker,擴充套件更多的分割槽 Partition 是提升 Kafka 效能的關鍵,如下圖所示:
2.訊息批發送(重要)
Kafka 預設是不支援批次傳送訊息的,然而開啟批次傳送訊息可以提升 Kafka 整體執行效率。
為什麼要批次傳送訊息?
批次傳送訊息有以下優點:
減少網路開銷:當生產者傳送訊息給 Kafka 時,如果每次只發送一條訊息,那麼就需要建立一次 TCP 連線,這涉及到三次握手的過程。而如果採用批次傳送的方式,則可以在一次 TCP 連線中傳送多條訊息,減少了網路連線建立和斷開的次數,從而降低了網路開銷。
減少 I/O 操作:批次傳送意味著一次寫入操作可以處理更多的資料。這對於磁碟 I/O 來說是一個優勢,因為一次大的寫操作比多次小的寫操作更高效。
提高吞吐量:由於減少了通訊次數,批次傳送可以提高單位時間內傳送的訊息數量,即提高了吞吐量。
那麼,想要實現 Kafka 批次訊息傳送只需要正確配置以下 3 個引數即可:
batch-size:定義了 Kafka 生產者嘗試批次傳送的訊息的最大大小(以位元組為單位),生產者收集到足夠多的訊息達到這個大小時,它會嘗試傳送這些訊息給 Kafka Broker,預設值為 16KB。
buffer-memory:指定了 Kafka 生產者可以用來緩衝待發送訊息的總記憶體空間,如果生產者試圖傳送的訊息超過了這個限制,生產者將會阻塞,直到有足夠空間可用或者訊息被髮送出去,預設值為 32MB。
linger.ms:生產者在嘗試傳送訊息前等待的最長時間(以毫秒為單位)。預設情況下,linger.ms 的值為 0,這意味著立即傳送。
以上 3 個引數滿足任一個都會立即(批次)傳送。
因此我們如果需要匹配發送,主要需要調整的引數是 linger.ms,如下配置所示:
spring: kafka: bootstrap-servers: localhost:9092 # Kafka伺服器地址 consumer: group-id: my-group # 消費者組ID auto-offset-reset: earliest # 自動重置偏移量到最早的可用訊息 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 鍵的反序列化器 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化器 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer # 鍵的序列化器 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化器 batch-size: 16384 buffer-memory: 33554432 properties: linger: ms: 2000
3.訊息批獲取(重要)
Kafka 預設每次拉取一條訊息,而使用批次獲取訊息可以有效提升 Kafka 執行效率。
為什麼要批次獲取訊息?
批次獲取訊息有以下優點:
降低客戶端處理開銷:對於客戶端來說,每次處理一個訊息需要進行一系列的操作,如解包、解析、處理邏輯等。如果每次只拉取一個訊息,客戶端會頻繁地進行這些操作,帶來較大的處理開銷。而批次拉取訊息時,客戶端可以一次性處理多個訊息,減少了處理單個訊息的頻率,從而降低了客戶端的處理開銷。
減少網路往返次數:每次拉取一個訊息時,客戶端需要與 Kafka 伺服器進行多次網路往返,包括髮送請求、接收響應等。這些網路往返會帶來一定的延遲。而批次拉取訊息時,客戶端可以一次性拉取多個訊息,減少了網路往返的次數,從而降低了網路延遲。
最佳化記憶體使用:批次拉取訊息可以更好地規劃和利用記憶體。客戶端可以一次性分配足夠的記憶體來儲存批次拉取的訊息,避免了頻繁地分配和釋放小記憶體塊的操作。這樣可以提高記憶體的使用效率,減少記憶體碎片的產生,進而提升系統的執行效率。
提高吞吐量:批次拉取訊息可以提高單位時間內處理的訊息數量,從而提升了 Kafka 的吞吐量。
想要實現批次讀取資料需要做以下兩步調整:
在配置檔案中設定批讀取:
spring.kafka.listener.type=batch
消費者使用 List<ConsumerRecord> 接收訊息,具體實現程式碼如下:
@KafkaListener(topics = TOPIC) public void listen(List<ConsumerRecord<?, ?>> consumerRecords) { for (int i = 0; i < consumerRecords.size(); i++) { System.out.println("監聽到訊息:" + consumerRecords.get(i).value()); } System.out.println("------------end------------"); }
以上程式的執行結果如下:
從執行結果可以看出:只有一個“end”列印,這說明 Kafka 一次拉取了一批資料,而不是一個數據,否則就會有多個“end”。
4.配置調優
合理設定 Kafka 的配置也可以一定程度的提升 Kafka 的效率,例如以下這些配置:
配置檔案刷盤策略:調整 flush.ms 和 flush.messages 引數,控制資料何時寫入磁碟。較小的值可以降低延遲,而較大的值可以提高吞吐量。
網路和 IO 操作執行緒配置最佳化:num.network.threads 應該設定為 CPU 核心數加 1,以充分利用硬體資源。調整 socket.send.buffer.bytes 和 socket.receive.buffer.bytes 以最佳化網路緩衝區大小,緩衝區越大,吞吐量也越高。
5.JVM 調優
因為 Kafka 是用 Java 和 Scala 兩種語言編寫的,而 Java 和 Scala 都是執行在 JVM 上的,因此保證 JVM 的高效執行,設定合理的垃圾回收器,也能間接的保證 Kafka 的執行效率。例如,對於大記憶體機器,可以使用 G1 垃圾收集器來減少 GC 暫停時間,併爲作業系統留出足夠的記憶體用於頁面快取。