我們在使用 Kafka 時,最簡單、最常用的方式是隻設定 topic(主題)和 value(訊息體),如下所示:
這樣的話獲取訊息的程式碼也很簡單,如下所示:
java程式碼解讀複製程式碼@KafkaListener(topics = "mytopic", groupId = "my-group") public void listen(String data) { System.out.println("監聽到訊息:" + data); }
但是,除了我們可以設定和傳遞 topic 和 value 之外,我們還可以傳遞 key,如下圖所示:
那問題來了,傳送訊息時設定這個 key 有什麼用呢?
key的作用
傳送訊息時,設定 key 的作用如下:
1.決定分割槽
當生產者傳送訊息時,如果指定了 key,Kafka 會根據 key 的 hash 值來決定這條訊息應該傳送到哪個分割槽。
如果沒有指定 key,Kafka 會採用輪詢(早期版本)或隨機(最新版本)的方式將訊息分配到其他分割槽中。
分割槽的具體實現原始碼在 DefaultPartitioner 中 partition 方法中體現,核心原始碼如下:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) { return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : BuiltInPartitioner.partitionForKey(keyBytes, numPartitions); }
指定 key 之後的分割槽實現程式碼如下:
public static int partitionForKey(byte[] serializedKey, int numPartitions) { return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; }
以上原始碼的大概含義是:使用 MurmurHash2 演算法對位元組陣列 serializedKey 進行雜湊運算,並將其結果轉換為正數,然後對 numPartitions 取模,以確定鍵在分割槽中的位置,返回值表示鍵所在的分割槽編號。
所以,從上述原始碼可以看出,傳送訊息如果設定了 key 之後,會將相同 key 放到同一個分割槽中。
2.保證訊息順序
在 Kafka 中,同一個分割槽中的訊息是有序的。而相同的 key,根據上面的分割槽演算法可知,它們會存放到同一個分割槽,這樣就能保證訊息的有序性了。
3.訊息過濾
對於某些應用場景,消費者可以根據訊息的鍵來進行過濾或聚合操作。例如,在實時資料分析場景中,可能需要對具有相同鍵的訊息進行分組處理。
Kafka 設定了 key 之後,可以透過以下方式實現訊息過濾,如下程式碼所示:
@KafkaListener(topics = "topicName", groupId = "groupId") public void listen(String message, ConsumerRecord<?,?> record) { Object key = record.key(); if (key instanceof String && ((String) key).matches("regexPattern")) { // 處理滿足正規表示式條件的訊息 } }
也就是,我們在接收到訊息之後,透過對 key 的正則匹配實現訊息的過濾和聚合等操作。