消费者群组用于协调消费者从多个主题或单个主题的多个分区读取消息。可以用kafka-consumer-groups.sh
来管理和查看消费者群组信息,比如列出所有消费者群组、查看特定的消费者群组、删除多个或特定的消费者群组以及重置消费者群组偏移量。
使用--list
参数可以列出消费者群组。使用kafka-consumer-groups.sh
创建的临时消费者将以console-consumer-<generated_id>
的格式列出来:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
myGroup
使用--describe
,并用--group
指定特定的群组,就可以获取这个群组的详细信息。它会列出消费者群组读取的所有主题和分区的信息以及每个分区的偏移量信息:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myGroup myTopic 4 - 1 - rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 3 - 0 - rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 0 66 66 0 rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 8 - 1 - rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 7 - 4 - rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 1 5 5 0 rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 6 - 0 - rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 2 6 7 1 rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 9 - 0 - rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup myTopic 5 - 0 - rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
输出结果字段说明:
可以用 --delete
命令来删除消费者群组。它将删除整个群组,包括所有已保存的偏移量。在删除群组之前,必须将群组里所有的消费者都关闭。如果你试图删除一个非空的群组,那么它将抛出“群组不为空”异常。也可以用这个命令删除单个主题的偏移量,只是需要额外提供 --topic
参数,并指定要删除的偏移量。
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group myGroup
Deletion of requested consumer groups ('myGroup') was successful.
除了显示和删除消费者群组的偏移量,还可以批量获取和保存消费者群组的偏移量,这在重置消费者偏移量时非常有用。当消费者因为某些原因需要重新读取消息或因为无法正常处理某些消息(比如格式错误的消息)需要跳过这些消息时就可以进行偏移量重置。
要将消费者群组的偏移量导出到CSV
文件,可以使用 --reset-offsets
参数和 --dry-run
参数。这样就可以将当前的偏移量导出到文件,并在需要的时候用于导入或回滚偏移量。导出的CSV
文件格式如下所示:
<topic-name>
,<partition-number>
,<offset>
如果没有指定 --dry-run
参数,那么偏移量将被重置,所以在执行这个命令时要十分小心。
Kafka
将会报错:Error: Assignments can only be reset if the group 'myGroup' is inactive, but the current state is Stable.
下面的例子会将消费者群组myGroup
读取的myTopic
主题的偏移量导出到一个叫作offsets.csv的文件中:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --export --group myGroup --topic myTopic --reset-offsets --to-current --dry-run > offsets.csv
$ cat offsets.csv
myTopic,4,3
myTopic,3,2
myTopic,0,69
myTopic,8,1
myTopic,7,6
myTopic,1,7
myTopic,6,1
myTopic,2,8
myTopic,9,1
myTopic,5,0
与导出偏移量刚好相反,导入偏移量会用之前导出的文件来重置消费者群组的偏移量。一般情况下,我们会先导出消费者群组的当前偏移量,并将导出的文件复制一份(作为备份),然后将文件中的偏移量修改成想要的值。
导入上面导出的offsets.csv
文件,以此来重置消费者群组myGroup
的偏移量:
$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group myGroup --topic myTopic --reset-offsets --from-file ./offsets.csv --execute
GROUP TOPIC PARTITION NEW-OFFSET
myGroup myTopic 4 3
myGroup myTopic 3 2
myGroup myTopic 0 69
myGroup myTopic 8 1
myGroup myTopic 7 6
myGroup myTopic 1 7
myGroup myTopic 6 1
myGroup myTopic 2 8
myGroup myTopic 9 1
myGroup myTopic 5 0