一、groupid的定義
在使用Kafka的時(shí)候,我們經(jīng)常會看到group.id這個(gè)配置項(xiàng),它是一個(gè)字符串類型的配置項(xiàng)。具體來說,每個(gè)消費(fèi)者都有一個(gè)group id,一般情況下我們可以將同樣處理某個(gè)數(shù)據(jù)源的消費(fèi)者放置在一組中,使用group id進(jìn)行標(biāo)識。
舉個(gè)例子,如果你有一個(gè)在多個(gè)地方運(yùn)行的日志處理程序,每個(gè)程序都會處理某個(gè)topic的消息,那么你可以用相同的group id來標(biāo)識這個(gè)處理組,以確保傳遞給組中的每個(gè)處理程序的消息是唯一的。
二、groupid的作用
Kafka通過group id分配消費(fèi)者之間的消息,確保一個(gè)組內(nèi)的消費(fèi)者不會接收到相同的消息。當(dāng)同一個(gè)group id下的多個(gè)消費(fèi)者訂閱了同一個(gè)topic時(shí),每個(gè)消息將只能被一個(gè)消費(fèi)者消費(fèi)。
在多個(gè)消費(fèi)者共同消費(fèi)一個(gè)topic的場景下,可以通過groupid來做load balance,即通過groupid的設(shè)置,部署多個(gè)消費(fèi)者實(shí)例來對消息進(jìn)行消費(fèi)。
三、groupid的注意事項(xiàng)
1、group id需要唯一
在同一個(gè)Kafka集群中,group id需要唯一,如果兩個(gè)group使用了相同的groupid,它們就會消費(fèi)相同的消息,造成消息的重復(fù)消費(fèi)。
2、重新啟動(dòng)后,groupid也需要唯一
如果在同一個(gè)group中,消費(fèi)者重啟或新加入消費(fèi)者組,那么每次加入新消費(fèi)者之前,需要確保添加的消費(fèi)者的group id在之前沒有被使用過。
3、group id的更改會導(dǎo)致消費(fèi)者重新從頭開始消費(fèi)
Kafka集群會為group id下的每個(gè)消費(fèi)者保存消費(fèi)的偏移量,如果group id被更改,消費(fèi)者將會從頭開始消費(fèi)。
四、實(shí)例代碼
// 配置項(xiàng)
properties.put("group.id", "test-group");
// 創(chuàng)建消費(fèi)者
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// 訂閱topic
consumer.subscribe(Arrays.asList("test-topic"));
// 消費(fèi)消息
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
五、小結(jié)
Kafka是一個(gè)分布式的消息隊(duì)列,通過group id來保證消費(fèi)者組內(nèi)的消息處理具有唯一性,可以做到消息的負(fù)載均衡和處理組內(nèi)消息的互斥性。在使用時(shí)需要注意group id的唯一性以及更改group id的影響等問題。