Kafka producer工作机制及性能监控

最近在做Kafka 1.0.0版本压测的时候,遇到了producer生产效率不稳定的情况。具体表现为当对6个以上有3个同步副本的TopicPartition进行生产时,producer每隔不定时长生产会被阻塞一段时间,控制台报出大量的batch expiring错误,整体吞吐量因此大打折扣。

虽然0.11.x版本之后的client性能测试脚本新增了许多metrics输出,以帮助用户监控和troubleshooting,然而在不了解producer背后运作机制的情况下,仅凭metrics字面意义是难以利用好这些数据的。因此,我决定花一个周末的时间来梳理一下Kafka producer端的架构及工作原理,特别是其中的延迟保证机制,另外整理下producer各项metrics背后的含义。

4月2日更新:初步确定producer被阻塞的原因是1.0.0版本broker端的bug,详情见KAFKA-6706

工作机制

如同大多数高吞吐量的系统设计,KafkaProducer将实际业务逻辑与IO分离以实现异步IO。Kafka producer包含了一个缓存池用于保存未发送到服务端的消息,同时后台的IO线程负责将消息转化为请求并传输至服务端。

具体produce流程如下:当KafkaProducer需要发送消息,首先会先从bootstrap servers获取最新的topic-partition信息,这个过程会阻塞生产线程,直到MetadataRequest完成。随后KafkaProducer开始做序列化和partition,将生成的消息追加到RecordAccumulator里。RecordAccumulator负责将消息收集分类形成batch,并维护消息的状态,决定什么情况下消息符合发送要求(ready),什么情况下消息会过期(expired)等等。RecordAccumulator拥有一个BufferPool实例,用于管理缓存消息的内存分配。当BufferPool耗尽内存之后,会阻塞生产线程,直至足够的内存被释放。在分配到需要的内存之后,消息会停驻在BufferPool中等待batch达到设置的batch.size或者足够时间即linger.ms,然后进入就绪(ready)状态。最后,后台IO线程Sender会不断检查就绪的batch,将其包装成ProduceRequest发送到broker。

延迟保证机制

因为消息队列是Kafka主要使用场景之一,提供在一定延迟内的消息送达对于用户十分重要。然而由于系统设计的问题,目前并没有一个配置来从out-of-box角度直接控制消息的延迟,取而代之的是两个主要的参数:block.max.msrequest.timeout.ms。其中max.block.ms包含两部分,一是请求topic元数据时的最大阻塞时长,二是等待BufferPool分配内存的最大阻塞时长,两者之和超过阈值会导致TimeoutException。而request.timeout.ms则有两重含义,分别是表示batch就绪后在RecordAccumulator等待的最大时长和等待ProduceRequest返回的最大时长,超过时长的请求会被认为失败,触发Producer进行重试并在重试次数耗光后返回失败。

整个消息生产的延时可以用下图(源自KIP-91)表示:

"produce延时"

各阶段的延迟如下:
1. send阶段: 发送TopicMetadataRequest获取最新元数据,然后将消息放至发送缓存池。如上文所述,该步骤可以阻塞最多block.max.ms。其后该记录会成为消息batch的一部分,有可能是原有的未满batch或者新的batch。
2. batching阶段: 在最坏的情况下,这个batch是新的,会在等待最多linger.ms之后被认为是ready状态,但它还需要等待被调度才会实际发送。linger在缓冲里通常有两种意义:一种是类似于链接的max.idle,即超过一定时长没有更新则关闭窗口;另外一种是类似于批处理的batch window,窗口经过一定时间后一定会关闭。这里的语义是后者。
3. await-send阶段: 等待传输到broker的机会。只有当相应partition所在broker处于sendable状态时,一个batch才可以被发送(比如受到max.inflight.requests限制时)。因为当Sender收到属于未知partition的消息时,需要阻塞线程向broker获取元数据。为了避免引入额外的超时参数,KIP-19用request.timeout.ms配置来决定在accumulator的batch是否过期。从batch变为ready状态计时就开始,直到batch被真正发送到网络上。注意这里有个比较特殊的地方是,KIP-19的机制不会将已知的partition的batch纳入超时统计的范围,但这个机制在KAFKA-2805中被移除,所以还是可以大致将这部分的最大延时设为request.timeout.ms加上等待Sender调度的时间。
4. inflight阶段: 消息已通过网络发送出去,producer等待请求响应以确认成功,request.timeout.ms以后超时返回错误。
5. retry阶段: 如果上一步请求超时或者发生可重试的错误,producer会等待retry.backoff.ms后再重试,直至retries耗尽。

Metrics

Kafka producer的metrics十分丰富,但过于细致缺乏概括性的直观指标,而且由于metrics分布在不同层级,相互之间不免有关联或者重叠,因此需要深入了解其工作机制后再分析这些metrics才有意义。

全局通用Metrics

全局通用Metrics统计的对象是所有broker,所以为“全局”,此外“通用”指的是这些metrics除了producer,还会出现在consumer/stream/connect中。

metric 含义
outgoing-byte-rate 每秒发至所有服务器的流出字节数
request-rate 每秒发送请求数的均值
request-size-avg 窗口内平均请求大小
request-size-max 窗口内最大请求大小
incoming-byte-rate 每秒收到所有服务器的流入字节数
request-latency-avg 平均请求延时,单位为ms
reuqest-latency-max 最大请求延时,单位为ms
response-rate 每秒收到返回数的均值

Broker级别的通用Metrics

Broker级别的通用Metrics含义与全局通用metrics相似,不同在于是针对每个broker分开统计,因此对于分析某个节点的异常十分有用。

metric 含义
connection-creation-rate 窗口内每秒新建连接数
connection-close-rate 创建内每秒关闭连接数
network-io-rate 每秒所有连接的网络平均读写次数
outgoing-byte-rate 每秒发至该broker的平均字节数
request-rate 每秒发送至该broker的平均请求数
request-size-avg 在broker窗口内所有请求的平均大小
request-size-max 在broker窗口内所有请求的最大大小
incoming-byte-rate 每秒从该broker接受的平均字节数
response-rate 每秒从该broker收到的平均响应数
select-rate IO层每秒检查新IO请求来执行的次数
io-wait-time-ns-avg IO线程等待可用socket的平均时间
io-wait-ratio IO线程等待可用socket的时间比例
io-time-ns-avg 每个select调用花费的平均时间,单位为纳秒
io-ratio IO线程执行IO操作的时间比例
connection-count 当前活跃的连接数
successful-authentication-rate 成功使用SASL或SSL成功认证的连接数
failed-authentication-rate 认证失败的连接数

Prodcuer Metrics

Producer metrics与下文的Producer Sender Metrics似乎有点重复,其实不然,这部分的metrics监控的是消息进入Accumulator之前的状态,反映的更多是BufferPool空间是否充足。

metric 含义
waiting-threads 等待消息进入缓存的被阻塞线程数
buffer-total-bytes 客户端可用的最大缓存大小(包含已使用的)
buffer-available-bytes 未被使用的缓存大小
bufferpool-wait-time 追加记录到缓存时等待空间分配的时间比例

Borker级别的Producer Sender Metrics

Producer Sender作为系统底层某块,提供了更为基础也更加直接好用的metrics,包括请求错误率、请求重试率等对性能十分关键的指标。更加重要的是由于网络的不可靠性,Sender往往是最容易成为瓶颈的地方,因此这部分的metrics也最为复杂。

metric 含义
batch-size-avg partition级别的平均batch大小
batch-size-max partition级别的最大batch大小
batch-split-rate 平均每秒split的batch数
batch-split-total split batch总数
compression-rate-avg 消息batch的平均压缩率
metadata-age 当前producer使用的元数据的年龄,单位为秒
produce-throttle-time-avg 每个请求被broker限流的平均时间
produce-throttle-time-max 单个请求被broker限流的最大时间
record-error-rate 每秒发送错误的平均记录数
record-error-total 发送错误的总记录数
record-queue-time-avg batch停留在发送缓存的平均时间
record-queue-time-max batch停留在发送缓存的最大时间
record-retry-rate 每秒平均重试记录数
record-retry-total 每秒总重试记录数
record-send-rate 每秒平均记录数
record-send-total 发送总记录数
record-size-avg 在broker窗口内所有记录的平均大小
record-size-max 在broker窗口内所有记录的最大大小
records-per-request-avg 每个请求包含的平均记录数
request-latency-avg 平均请求延时,单位为ms
reuqest-latency-max 最大请求延时,单位为ms
requestin-flight 当前在传输中等待响应的请求数

Topic级别的Producer Sender Metrics

如果说broker级别的Producer Sender metrics更注重基础设施的稳定性,topic级别的metrics更关心Prodcer和业务的结合,如batch size调优或compression调优这类需要结合消息内容的优化都可以参考这部分的metrics。

metric 含义
byte-rate 每秒发至该topic的平均字节数
byte-total 发至该topic的总字节数
compression-rate 发至该topic的消息batch的平均压缩率
record-retry-rate 每秒发向该topic的平均重试记录数
record-retry-total 每秒发向该topic的总重试记录数
record-error-rate 每秒发向该topic并发送错误的平均记录数
record-error-total 发向该topic并发送错误的总记录数
record-send-rate 每秒发至该topic的平均记录数
record-send-total 发至该topic的总记录数

后续改进

如你所见,目前Kafka的超时机制十分复杂并且将底层逻辑暴露给用户,并不友好。KIP-91建议引入新的超时机制,即将Producer的所有发送阶段的超时整合为一个参数,delivery.timeout.ms。从记录进入Acccumulator开始计时,包含组装batch、等待发送、发送等待响应以及重试,直至记录成功写入或者失败,在总体时长上提供不高于delivery.timeout.ms的保证。新的超时机制如下图:

"KIP-91 延迟机制"

新机制除了更为直观,也避免了之前复杂的逻辑带来的脆弱性。目前该KIP正处于开发阶段,预计会在2.0.0版本发布(听起来真是个遥不可及的时间。

参考文献

1.KIP-91: Provide Intuitive User Timeouts in The Produer
2.KIP-19: Add a request time out to NetworkClient
3.Kafka documentation
4.confluent-monitoring
5.KAFKA-1251: Add metrics to the producer

本文是原创文章,转载请注明:时间与精神的小屋 - Kafka producer工作机制及性能监控