TrumanWong

Kafka生产者配置

TrumanWong
6/29/2024

提示:本文基于Kafka 3.7.0写成

Kafka生产者有很多可配置的参数,在Kafka生产者配置文档中都有说明。它们大部分有合理的默认值,没有必要进行修改。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,本文将详细介绍它们。

client.id

client.id是客户端标识符,它的值可以是任意字符串,broker用它来识别从客户端发送过来的消息。client.id可以被用在日志、指标和配额中。选择一个好的客户端标识符可以让故障诊断变得更容易些。

acks

默认值:all

acks指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。在默认情况下,Kafka会在首领副本收到消息后向客户端回应消息写入成功。这个参数对写入消息的持久性有重大影响,对于不同的场景,使用默认值可能不是最好的选择。

  • acks=0

    如果acks=0,则生产者不会等待任何来自broker的响应。也就是说,如果broker因为某些问题没有收到消息,那么生产者便无从得知,消息也就丢失了。不过,因为生产者不需要等待broker返回响应,所以它们能够以网络可支持的最大速度发送消息,从而达到很高的吞吐量。

  • acks=1

    如果acks=1,那么只要集群的首领副本收到消息,生产者就会收到消息成功写入的响应。如果消息无法到达首领副本(比如首领副本发生崩溃,新首领还未选举出来),那么生产者会收到一个错误响应。为了避免数据丢失,生产者会尝试重发消息。不过,在首领副本发生崩溃的情况下,如果消息还没有被复制到新的首领副本,则消息还是有可能丢失。

  • acks=allacks=-1

    如果acks=all,那么只有当所有副本全部收到消息时,生产者才会收到消息成功写入的响应。这种模式是最安全的,它可以保证不止一个broker收到消息,就算有个别broker发生崩溃,整个集群仍然可以运行。不过,它的延迟比acks=1高,因为生产者需要等待不止一个broker确认收到消息。

从上面可以看到,为acks设置的值越小,生产者发送消息的速度就越快。也就是说,我们通过牺牲可靠性来换取较低的生产者延迟。不过,端到端延迟是指从消息生成到可供消费者读取的时间,这对3种配置来说都是一样的。这是因为为了保持一致性,在消息被写入所有同步副本之前,Kafka不允许消费者读取它们。因此,如果你关心的是端到端延迟,而不是生产者延迟,那么就不需要在可靠性和低延迟之间做权衡了:你可以选择最可靠的配置,但仍然可以获得相同的端到端延迟。

消息传递时间

max.block.ms

默认值: 60000 (1分钟)

这个参数用于控制在调用send()或通过partitionsFor()显式地请求元数据时生产者可以发生阻塞的时间。当生产者的发送缓冲区被填满或元数据不可用时,这些方法就可能发生阻塞。当达到max.block.ms配置的时间时,就会抛出一个超时异常。

delivery.timeout.ms

默认值: 540000(9分钟)

这个参数用于控制从消息准备好发送(send()方法成功返回并将消息放入批次中)到broker响应或客户端放弃发送(包括重试)所花费的时间。如果配置的时间不满足这一点,则会抛出异常。通常,成功发送消息的速度要比delivery.timeout.ms快得多。如果生产者在重试时超出了delivery.timeout.ms,那么将执行回调,并会将broker之前返回的错误传给它。如果消息批次还没有发送完毕就超出了delivery.timeout.ms,那么也将执行回调,并会将超时异常传给它。这个时间应该大于或等于linger.msrequest.timeout.ms

request.timeout.ms

默认值:30000 (30秒)

这个参数用于控制生产者在发送消息时等待服务器响应的时间。需要注意的是,这是指生产者在放弃之前等待每个请求的时间,不包括重试、发送之前所花费的时间等。如果设置的值已触及,但服务器没有响应,那么生产者将重试发送,或者执行回调,并传给它一个TimeoutException

linger.ms

默认值:0

这个参数指定了生产者在发送消息批次之前等待更多消息加入批次的时间。生产者会在批次被填满或等待时间达到linger.ms时把消息批次发送出去。在默认情况下,只要有可用的发送者线程,生产者都会直接把批次发送出去,就算批次中只有一条消息。把linger.ms设置成比0大的数,可以让生产者在将批次发送给服务器之前等待一会儿,以使更多的消息加入批次中。虽然这样会增加一点儿延迟,但也极大地提升了吞吐量。这是因为一次性发送的消息越多,每条消息的开销就越小,如果启用了压缩,则计算量也更少了。

retriesretry.backoff.ms

retries默认值:2147483647

retry.backoff.ms默认值:1000 (1秒)

当生产者收到来自服务器的错误消息时,这个错误有可能是暂时的(例如,一个分区没有首领)。在这种情况下,retries参数可用于控制生产者在放弃发送并向客户端宣告失败之前可以重试多少次。在默认情况下,重试时间间隔是100毫秒,但可以通过retry.backoff.ms参数来控制重试时间间隔。

并不建议在当前版本的Kafka中使用这些参数。相反,你可以测试一下broker在发生崩溃之后需要多长时间恢复(也就是直到所有分区都有了首领副本),并设置合理的delivery.timeout.ms,让重试时间大于Kafka集群从崩溃中恢复的时间,以免生产者过早放弃重试。

生产者并不会重试所有的错误。有些错误不是暂时的,生产者就不会进行重试(例如,“消息太大”错误)。通常,对于可重试的错误,生产者会自动进行重试,所以不需要在应用程序中处理重试逻辑。你要做的是集中精力处理不可重试的错误或者当重试次数达到上限时的情况。

如果想完全禁用重试,那么唯一可行的方法是将retries设置为0。

buffer.memory

默认值:33554432

这个参数用来设置生产者要发送给服务器的消息的内存缓冲区大小。如果应用程序调用send()方法的速度超过生产者将消息发送给服务器的速度,那么生产者的缓冲空间可能会被耗尽,后续的send()方法调用会等待内存空间被释放,如果在max.block.ms之后还没有可用空间,就抛出异常。需要注意的是,这个异常与其他异常不一样,它是send()方法而不是Future对象抛出来的。

compression.type

默认值:none

在默认情况下,生产者发送的消息是未经压缩的。这个参数可以被设置为none, gzipsnappylz4、或zstd,这指定了消息被发送给broker之前使用哪一种压缩算法。snappy压缩算法由谷歌发明,虽然占用较少的CPU时间,但能提供较好的性能和相当可观的压缩比。如果同时有性能和网络带宽方面的考虑,那么可以使用这种算法。gzip压缩算法通常会占用较多的CPU时间,但提供了更高的压缩比。如果网络带宽比较有限,则可以使用这种算法。使用压缩可以降低网络传输和存储开销,而这些往往是向Kafka发送消息的瓶颈所在。

batch.size

默认值:16384

当有多条消息被发送给同一个分区时,生产者会把它们放在同一个批次里。这个参数指定了一个批次可以使用的内存大小。需要注意的是,该参数是按照字节数而不是消息条数来计算的。当批次被填满时,批次里所有的消息都将被发送出去。但是生产者并不一定都会等到批次被填满时才将其发送出去。那些未填满的批次,甚至只包含一条消息的批次也有可能被发送出去。所以,就算把批次大小设置得很大,也不会导致延迟,只是会占用更多的内存而已。但如果把批次大小设置得太小,则会增加一些额外的开销,因为生产者需要更频繁地发送消息。

max.in.flight.requests.per.connection

默认值:5

这个参数指定了生产者在收到服务器响应之前可以发送多少个消息批次。它的值越大,占用的内存就越多,不过吞吐量也会得到提升。实验数据表明,在单数据中心环境中,该参数被设置为2时可以获得最佳的吞吐量,但使用默认值5也可以获得差不多的性能。

Kafka可以保证同一个分区中的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息,那么broker会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。

假设我们把retries设置为非零的整数,并把max.in.flight.requests.per.connection设置为比1大的数。如果第一个批次写入失败,第二个批次写入成功,那么broker会重试写入第一个批次,等到第一个批次也写入成功,两个批次的顺序就反过来了。

我们希望至少有2个正在处理中的请求(出于性能方面的考虑),并且可以进行多次重试(出于可靠性方面的考虑),这个时候,最好的解决方案是将enable.idempotence设置为true。这样就可以在最多有5个正在处理中的请求的情况下保证顺序,并且可以保证重试不会引入重复消息。

enable.idempotence

默认值:true

从0.11版本开始,Kafka支持精确一次性exactly once语义。幂等生产者是它的一个简单且重要的组成部分。

假设为了最大限度地提升可靠性,你将生产者的acks设置为all,并将delivery.timeout.ms设置为一个比较大的数,允许进行尽可能多的重试。这些配置可以确保每条消息被写入Kafka至少一次。但在某些情况下,消息有可能被写入Kafka不止一次。假设一个broker收到了生产者发送的消息,然后消息被写入本地磁盘并成功复制给了其他broker。此时,这个broker还没有向生产者发送响应就发生了崩溃。而生产者将一直等待,直到达到request.timeout.ms,然后进行重试。重试发送的消息将被发送给新的首领,而这个首领已经有这条消息的副本,因为之前写入的消息已经被成功复制给它了。现在,你就有了一条重复的消息。

为了避免这种情况,可以将enable.idempotence设置为true。当幂等生产者被启用时,生产者将给发送的每一条消息都加上一个序列号。如果broker收到具有相同序列号的消息,那么它就会拒绝第二个副本,而生产者则会收到DuplicateSequenceException,这个异常对生产者来说是无害的。

如果要启用幂等性,那么max.in.flight.requests.per.connection应小于或等于5、retries应大于0,并且acks被设置为all。如果设置了不恰当的值,则会抛出ConfigException异常。

max.request.size

默认值:1048576

这个参数用于控制生产者发送的请求的大小。它限制了可发送的单条最大消息的大小和单个请求的消息总量的大小。假设这个参数的值为1 MB,那么可发送的单条最大消息就是1 MB,或者生产者最多可以在单个请求里发送一条包含1024个大小为1 KB的消息。另外,broker对可接收的最大消息也有限制message.max.bytes,其两边的配置最好是匹配的,以免生产者发送的消息被broker拒绝。

receive.buffer.bytessend.buffer.bytes

receive.buffer.bytes默认值:65536

send.buffer.bytes默认值:131072

这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小。如果它们被设为–1,就使用操作系统默认值。如果生产者或消费者与broker位于不同的数据中心,则可以适当加大它们的值,因为跨数据中心网络的延迟一般都比较高,而带宽又比较低。