TrumanWong

Kafka管理命令(二):消费者群组

TrumanWong
5/23/2024

消费者群组用于协调消费者从多个主题或单个主题的多个分区读取消息。可以用kafka-consumer-groups.sh来管理和查看消费者群组信息,比如列出所有消费者群组、查看特定的消费者群组、删除多个或特定的消费者群组以及重置消费者群组偏移量。

列出消费者群组

使用--list参数可以列出消费者群组。使用kafka-consumer-groups.sh创建的临时消费者将以console-consumer-<generated_id>的格式列出来:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
myGroup

查看消费者群组详情

使用--describe,并用--group指定特定的群组,就可以获取这个群组的详细信息。它会列出消费者群组读取的所有主题和分区的信息以及每个分区的偏移量信息:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group myGroup 

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST             CLIENT-ID
myGroup         myTopic         4          -               1               -               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         3          -               0               -               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         0          66              66              0               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         8          -               1               -               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         7          -               4               -               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         1          5               5               0               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         6          -               0               -               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         2          6               7               1               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         9          -               0               -               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka
myGroup         myTopic         5          -               0               -               rdkafka-a7aba4d8-94ca-4158-80cd-072e2b05651b /192.168.118.128 rdkafka

输出结果字段说明:

  • GROUP:消费者群组名称
  • TOPIC:正在读取的主题名称
  • PARTTION:正在读取的分区ID
  • CURRENT-OFFSET:消费者群组要读取的下一条消息的偏移量,也就是消费者在当前分区中的位置。
  • LOG-END-OFFSET:分区的当前高水位标记,也就是下一条要写入这个分区的消息的偏移量
  • LAGCURRENT-OFFSETLOG-END-OFFSET之间的差值
  • CONSUMER-ID:基于CLIENT-ID生成的消费者ID
  • HOST:消费者群组读取的主题所在的主机名
  • CLIENT-ID:客户端提供的用于标识客户端的ID

删除消费者群组

可以用 --delete命令来删除消费者群组。它将删除整个群组,包括所有已保存的偏移量。在删除群组之前,必须将群组里所有的消费者都关闭。如果你试图删除一个非空的群组,那么它将抛出“群组不为空”异常。也可以用这个命令删除单个主题的偏移量,只是需要额外提供 --topic参数,并指定要删除的偏移量。

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group myGroup
Deletion of requested consumer groups ('myGroup') was successful.

偏移量管理

除了显示和删除消费者群组的偏移量,还可以批量获取和保存消费者群组的偏移量,这在重置消费者偏移量时非常有用。当消费者因为某些原因需要重新读取消息或因为无法正常处理某些消息(比如格式错误的消息)需要跳过这些消息时就可以进行偏移量重置。

导出偏移量

要将消费者群组的偏移量导出到CSV文件,可以使用 --reset-offsets参数和 --dry-run参数。这样就可以将当前的偏移量导出到文件,并在需要的时候用于导入或回滚偏移量。导出的CSV文件格式如下所示:

<topic-name>,<partition-number>,<offset>

如果没有指定 --dry-run参数,那么偏移量将被重置,所以在执行这个命令时要十分小心。

注意:在导入/导出偏移量之前,必须先关闭所有的消费者。如果消费者群组处于活跃状态,Kafka将会报错:

Error: Assignments can only be reset if the group 'myGroup' is inactive, but the current state is Stable.

下面的例子会将消费者群组myGroup读取的myTopic主题的偏移量导出到一个叫作offsets.csv的文件中:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --export --group myGroup --topic myTopic --reset-offsets --to-current --dry-run > offsets.csv
$ cat offsets.csv 
myTopic,4,3
myTopic,3,2
myTopic,0,69
myTopic,8,1
myTopic,7,6
myTopic,1,7
myTopic,6,1
myTopic,2,8
myTopic,9,1
myTopic,5,0

导入偏移量

与导出偏移量刚好相反,导入偏移量会用之前导出的文件来重置消费者群组的偏移量。一般情况下,我们会先导出消费者群组的当前偏移量,并将导出的文件复制一份(作为备份),然后将文件中的偏移量修改成想要的值。

导入上面导出的offsets.csv文件,以此来重置消费者群组myGroup的偏移量:

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group myGroup --topic myTopic --reset-offsets --from-file ./offsets.csv --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
myGroup                        myTopic                        4          3              
myGroup                        myTopic                        3          2              
myGroup                        myTopic                        0          69             
myGroup                        myTopic                        8          1              
myGroup                        myTopic                        7          6              
myGroup                        myTopic                        1          7              
myGroup                        myTopic                        6          1              
myGroup                        myTopic                        2          8              
myGroup                        myTopic                        9          1              
myGroup                        myTopic                        5          0