TrumanWong

Kafka配置broker

TrumanWong
5/20/2024

Kafka配置broker

Kafka发行版中自带的配置示例可以用来安装单机服务,主要用于概念验证,并不能满足大型集群的配置需求。Kafka有很多配置参数,涉及安装和调优的方方面面,其中的大多数参数可以使用默认值,除非你对调优有特别的需求。

常规配置参数

一些配置参数在单机安装时可以使用默认值,但部署到其他环境中时要格外小心。这些参数是针对单台服务器最基本的配置,其中大多数参数需要经过修改后才能用在集群中。

broker.id

每个broker都需要有一个整数标识符,该标识符是使用broker.id指定的。它的默认值是0,但可以被设置成其他任意整数。这个值在整个Kafka集群中必须是唯一的,并且可以在服务器节点间移动。建议把ID设置成与主机名具有相关性的整数,这样就可以很容易地将ID与主机名映射起来。如果主机名包含唯一性的数字(比如host1.example.comhost2.example.com等),那么用12这些数字来设置broker.id就再好不过了。

listeners

listeners配置参数是一个用逗号分隔的URI列表,也就是要监听的地址和端口。如果没有为监听器指定安全协议,则还需要额外配置listener.security.protocol.map参数。监听器的格式为<protocol>://<hostname>:<port>,例如,PLAINTEXT://localhost:9092,SSL://:9091就是一个合法的配置。如果主机名是0.0.0.0,那么将绑定所有的网络接口地址。如果主机名为空,那么将绑定默认的网络接口地址。需要注意的是,如果指定的端口号小于1024,则必须用root权限启动Kafka,但不建议这么配置

zookeeper.connect

用于保存broker元数据的ZooKeeper地址是通过zookeeper.connect来指定的。示例配置使用了一个运行在2181端口上的ZooKeeper,所以指定了localhost:2181。这个参数的值是用逗号分隔的一组hostname:port/path,每一部分的含义如下。

  • hostname

    ZooKeeper服务器的主机名或IP地址。

  • port

    ZooKeeper的客户端连接端口。

  • /path

    可选的ZooKeeper路径,以作为Kafka集群的chroot。如果不指定,则默认使用根路径。如果指定的chroot路径(一般作为应用程序的根目录)不存在,那么broker会在启动时创建它。

log.dirs

Kafka把所有消息都保存在磁盘上,存放日志片段的目录是通过log.dir来指定的。如果有多个目录,则可以用log.dirs来指定。如果没有指定这个参数,则默认使用log.dir。***log.dirs是一组用逗号分隔的本地文件系统路径。如果指定了多条路径,那么broker会根据“最少使用”原则,把同一个分区的日志片段保存到同一条路径下。***需要注意的是,broker会向分区数量最少的目录新增分区,而不是向可用磁盘空间最小的目录新增分区,所以并不能保证数据会被均匀地分布在多个目录中。

num.recovery.threads.per.data.dir

Kafka使用线程池来处理日志片段。目前,线程池被用于以下3种情形:

  • 当服务器正常启动时,用于打开每个分区的日志片段。
  • 当服务器正常启动时,用于打开每个分区的日志片段。
  • 当服务器正常启动时,用于打开每个分区的日志片段。

在默认情况下,每个日志目录只使用一个线程。因为这些线程只在服务器启动和关闭时使用,所以可以多设置一些线程来实现并行操作。特别是对包含大量分区的服务器来说,一旦发生崩溃,在从错误中恢复时可以通过并行操作省下数小时的时间。需要注意的是,这个参数对应的是log.dirs中的一个目录,也就是说,如果num.recovery.threads.per.data.dir被设为8,并且log.dirs指定了3条路径,那么总共需要24个线程

auto.create.topics.enable

在默认情况下,Kafka会在如下几种情形中自动创建主题:

  • 在默认情况下,Kafka会在如下几种情形中自动创建主题。
  • 在默认情况下,Kafka会在如下几种情形中自动创建主题。
  • 在默认情况下,Kafka会在如下几种情形中自动创建主题。

根据Kafka协议,如果一个主题不提前被创建,则无法知道它是否存在。如果你选择通过手动的方式或其他配置系统来显式地创建主题,那么可以把auto.create.topics.enable设为false

auto.leader.rebalance.enable

为了确保主题的所有权不会集中在一台broker上,可以将这个参数设置为true,让主题的所有权尽可能地在集群中保持均衡。如果启用了这个功能,那么就会有一个后台线程定期检查分区的分布情况(这个时间间隔可以通过leader.imbalance.check.interval.seconds来配置)。如果不均衡的所有权超出了leader.imbalance.per.broker.percentage指定的百分比,则会启动一次分区首领再均衡。

delete.topic.enable

根据应用场景和数据保留策略的不同,你可能希望将集群锁定,以防主题被随意删除。把这个参数设置为false就可以禁用主题删除功能。

主题的默认配置

Kafka为新创建的主题提供了很多默认的配置参数。

num.partitions

num.partitions参数指定了新创建的主题将包含多少个分区,特别是如果启用了主题自动创建功能(默认是启用的),那么主题的分区数就是这个参数指定的值。默认是1个分区。需要注意的是,可以增加主题的分区数,但不能减少。所以,如果要让一个主题的分区数小于num.partitions指定的值,则需要手动创建主题。

default.replication.factor

如果启用了自动创建主题功能,那么这个参数的值就是新创建主题的复制系数。

log.retention.ms

Kafka通常根据配置的时间长短来决定数据可以被保留多久。我们使用log.retention.hours参数来配置时间,默认为168小时,也就是1周。除此以外,还有另外两个参数log.retention.minuteslog.retention.ms。这3个参数的作用是一样的(都是用于确定消息将在多久以后被删除),不过还是推荐使用log.retention.ms,因为如果指定了不止一个参数,那么Kafka会优先使用具有最小单位值的那个。

log.retention.bytes

另一种数据保留策略是通过计算已保留的消息的字节总数来判断旧消息是否过期。这个字节总数阈值通过参数log.retention.bytes来指定,对应的是每一个分区。也就是说,如果一个主题包含8个分区,并且log.retention.bytes被设置为1 GB,那么这个主题最多可以保留8 GB的数据。需要注意的是,所有保留都是针对单个分区而不是主题执行的

所以,如果配置了这个参数,那么当主题增加了新分区,整个主题可以保留的数据也会随之增加。如果这个值被设置为–1,那么分区就可以无限期地保留数据

如果同时指定了log.retention.byteslog.retention.ms(或另一个按时间保留的参数),那么只要任意一个条件得到满足,消息就会被删除。假设log.retention.ms被设置为86 400 000(也就是1天),log.retention.bytes被设置为1 000 000 000(也就是1 GB),如果消息字节总数不到一天就超过了1 GB,那么旧数据就会被删除。相反,如果消息字节总数小于1 GB,那么一天之后这些消息也会被删除,尽管分区的数据总量小于1 GB。为简单起见,建议只选择其中的一种保留策略,要么基于数据大小,要么基于时间,或者两种都不选择,以防发生意外的数据丢失。不过,对于复杂的场景,可以两种都使用。

log.segment.bytes

前面介绍的参数都作用在日志片段而不是单条消息上。当消息到达broker时,它们会被追加到分区的当前日志片段上。当日志片段大小达到log.segment.bytes指定的上限(默认是1 GB)时,当前日志片段会被关闭,一个新的日志片段会被打开。一旦日志片段被关闭,就可以开始进入过期倒计时。这个参数的值越小,关闭和分配新文件就会越频繁,从而降低整体的磁盘写入效率。

如果主题的消息量不是很大,那么如何设置这个参数就变得尤为重要。如果一个主题每天只接收100 MB的消息,并且log.segment.bytes使用了默认设置,那么填满一个日志片段将需要10天。因为在日志片段被关闭之前消息是不会过期的,所以如果log.retention.ms被设为604 800 000(也就是1周),那么日志片段最多需要17天才会过期。这是因为关闭日志片段需要10天,而根据配置的过期时间,还需要再保留数据7天(要等到日志片段的最后一条消息过期才能将其删除)。

log.roll.ms

另一个可用于控制日志片段关闭时间的参数是log.roll.ms,它指定了多长时间之后日志片段可以被关闭。就像log.retention.byteslog.retention.ms一样,log.segment.byteslog.roll.ms并不互斥。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。在默认情况下,log.roll.ms没有设定值,所以使用log.roll.hours设定的默认值——168小时,也就是7天。

在使用基于时间的日志片段时,需要考虑并行关闭多个日志片段对磁盘性能的影响。如果多个分区的日志片段一直未达到大小的上限,就会出现这种情况。这是因为broker在启动时会计算日志片段的过期时间,一旦满足条件,就会并行关闭它们,尽管它们的数据量可能很少。

min.insync.replicas

为了提升集群的数据持久性,可以将min.insync.replicas设置为2,确保至少有两个副本跟生产者保持“同步”。生产者需要配合将ack设置为all,这样就可以确保至少有两个副本(首领和另一个副本)确认写入成功,从而防止在以下情况下丢失数据:首领确认写入,然后发生停机,所有权被转移到一个副本,但这个副本没有写入成功。如果没有这些配置,则生产者会认为已经写入成功,但实际上消息丢失了。不过,这样做是有副作用的,因为需要额外的开销,所以效率会有所降低。因此,对于能够容忍偶尔消息丢失的高吞吐量集群,不建议修改这个参数的默认值。

message.max.bytes

broker通过设置message.max.bytes参数来限制单条消息的大小,默认值是1 000000,也就是1 MB。如果生产者尝试发送超过这个大小的消息,那么不仅消息不会被broker接收,还会收到broker返回的错误信息。与其他broker配置参数一样,这个参数指的是压缩后的消息大小,也就是说,消息的实际大小可以远大于message.max.bytes,只要压缩后小于这个值即可。

这个参数对性能有显著的影响。值越大,负责处理网络连接和请求的线程用在处理请求上的时间就越长。它还会增加磁盘写入块的大小,从而影响I/O吞吐量。其他的存储解决方案,比如大对象存储或分层存储,可能也是解决大磁盘写入问题的一种方案。

消费者客户端设置的fetch.max.bytes需要与服务器端设置的消息大小保持一致。如果这个参数的值比message.max.bytes小,那么消费者就无法读取比较大的消息,进而造成阻塞,无法继续处理消息。在配置brokerreplica.fetch.max.bytes参数时,也遵循同样的原则。