TrumanWong

Kafka管理命令(一):主题操作

TrumanWong
5/22/2024

管理Kafka集群意味着需要使用额外的工具对主题、配置等做出修改。Kafka提供了一些命令行工具,以用于对集群做出变更。这些工具使用Java类实现,Kafka提供了一些脚本来调用它们。不过,这些工具只提供了基本的功能,无法完成复杂的操作,难以被用于管理大规模的集群。

虽然Kafka为主题操作提供了身份验证和授权机制,但默认配置并没有对这些工具的使用进行严格的限制。也就是说,不需要经过身份验证也可以使用这些工具,并可以在没有安全检查和审计的情况下执行诸如修改主题之类的操作。为防止发生未经授权的变更,需要确保只有管理员可以使用这些工具

创建新主题

使用 --create命令创建新主题时,必须提供几个参数,即使其中一些参数可能已经有了broker级别的默认值。还可以通过 --config选项指定其他参数和覆盖默认值。以下是必须提供的3个参数:

  • --topic:想要创建的主题的名字。
  • --replication-factor:主题的副本数量。
  • --partitions:主题的分区数量。

主题的名字可以包含字母数字字符、下划线、破折号和点号,但不建议使用点号。Kafka的内部会将点号转成下划线(例如,topic.1在指标中会变成topic_1),这可能会导致主题名称冲突。

另外,避免使用双下划线作为主题名字的前缀。根据约定,Kafka内部主题的名字以双下划线开头(例如,用于保存消费者组偏移量的 __consumer_offsets主题)。因此,为了避免冲突,不建议使用以双下划线开头的主题名字。

创建新主题命令如下:

$ kafka-topics.sh --bootstrap-server <connection-string>:<port> --create --topic <string> --replication-factor <integer> --partitions <integer>

这个命令将创建一个新主题,主题名字为指定的值,并包含了指定数量的分区。集群会为每个分区创建指定数量的副本。如果为集群指定了基于机架信息的副本分配策略,那么分区的副本就会被放置在不同的机架上。如果不需要这种分配策略,则可以指定 --disable-rack-aware选项。

$ kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --replication-factor 1 --partitions 4

可以指定 --if-not-exists选项,这样即使要创建的主题已经存在也不会抛出异常。

虽然Kafka--alter命令提供了 --if-exists参数,但不建议使用它。这是因为如果指定了这个选项,并且要修改的主题不存在,那么 --alter命令就不会返回任何错误,从而掩盖了本应创建这个不存在的主题的错误。

列出集群中的所有主题

可以用 --list命令列出集群中的所有主题。每个主题占用一行输出,主题之间没有特定的顺序。

$ kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
my-topic
myTopic

这里也列出了内部主题 __consumer_offsets。可以用 --exclude-internal参数将名字以双下划线开头的内部主题排除。

查看主题详情

通过--describe参数可以获取所有主题的详细信息。详细信息里包含了分区数量、主题的覆盖配置以及每个分区的副本清单。也可以指定 --topic参数,只列出指定主题的详细信息。

$ kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
Topic: my-topic	TopicId: 2rqf4_D2RveIw1LPisdtSQ	PartitionCount: 4	ReplicationFactor: 1	Configs: 
    Topic: my-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 3	Leader: 0	Replicas: 0	Isr: 0

--describe命令也有一些用于过滤输出结果的参数,它们在诊断集群问题时非常有用。我们通常不会为这些命令指定 --topic参数,因为我们的目的是要找出集群中所有满足条件的主题和分区。这个参数对list命令不管用。下面是其他一些有用的参数:

  • --topics-with-overrides

    这个参数只会列出配置参数与集群默认值不同的主题。

  • --exclude-internal

    这是之前提到的参数,它可以将所有名字以双下划线开头的主题排除。

下面的参数可用于找出有问题的分区。

  • --under-replicated-partitions

    这个参数可以找出一个或多个副本与首领不同步的分区。这并不一定是坏事,因为集群维护、部署和再均衡都会导致分区副本不同步,但还是要注意一下。

  • --at-min-isr-partitions

    这个参数可以找出副本数量(包括首领在内)与配置的最少同步副本(ISR)数完全匹配的分区。这些主题对生产者客户端或消费者客户端来说仍然可用,但已无冗余,有会变得不可用的风险。

  • --under-min-isr-partitions

    这个参数可以找出ISR数低于配置的最小值的分区。这些分区实际上处于只读模式,不能向其写入数据。

  • --unavailable-partitions

    这个参数可以找出所有没有首领的分区。这种情况很严重,说明分区已离线,对生产者客户端或消费者客户端来说已经是不可用的。

增加分区

有时候需要增加主题的分区数量。分区是主题进行伸缩和复制的基础,增加分区通常是为了通过降低单个分区的吞吐量来扩展主题容量。如果要在单个消费者群组内运行更多的消费者,则分区数量也需要相应增加,因为一个分区只能被同一个群组里的一个消费者读取

# 使用--alter参数将分区增加到8
$ kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my-topi --partitions 8
# 查看是否生效
$ kafka-topics.sh --bootstrap-server localhost.com:9092 --describe --topic my-topic 
Topic: my-topic	TopicId: 2rqf4_D2RveIw1LPisdtSQ	PartitionCount: 8	ReplicationFactor: 1	Configs: 
    Topic: my-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 3	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 4	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 5	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 6	Leader: 0	Replicas: 0	Isr: 0
    Topic: my-topic	Partition: 7	Leader: 0	Replicas: 0	Isr: 0

从消费者角度来看,为包含键的主题添加分区是很困难的。这是因为如果改变了分区数量,那么键与分区之间的映射也会发生变化。因此,对于包含键的主题,建议一开始就设置好分区数量,避免后续对其进行调整。

减少分区

Kafka不支持减少主题的分区数量。如果删除了主题的一个分区,那么这个分区里的数据也会被删除,导致客户端看到的数据不一致。此外,将数据重新分布到其余的分区中是非常困难的,即使能够做到,也无法保证消息的顺序。如果要减少分区数量,则建议删除整个主题并重新创建,或者(如果不能删除的话)创建一个新主题(比如叫作my-topic-v2),并将所有流量重定向到新主题。

删除主题

即使一个主题里没有任何消息,仍然会占用集群资源(比如磁盘空间、文件句柄和内存)。控制器中也有一些垃圾元数据,这些垃圾元数据在大型集群中会影响集群性能。如果一个主题不再被使用,那么可以将其删除,以便释放出被占用的资源。要删除主题,brokerdelete.topic.enable参数必须被设置为true,如果设置为false,那么删除主题的请求将被忽略,也就无法删除主题

删除主题是异步操作,也就是说,要删除的主题将被打上删除标记,但可能不会立即被删除,具体取决于数据量和清理策略。控制器会尽快通知broker有挂起的删除操作(在现有的控制器任务完成之后),然后broker会让主题的元数据失效,并从磁盘上删除相应的文件。由于控制器删除主题的方式受到了限制,建议不要一次性删除多个主题,而且在开始删除其他主题之前要给当前的删除操作留有足够的时间。

删除一个主题也就删除了主题的所有数据。这是一个不可逆操作,所以在删除时要十分小心。

$ kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic

主题删除是否成功并没有可视的反馈,不过可以通过 --list参数或 --describe参数来验证。如果要删除的主题已经不在集群中,则说明删除成功。