最近在做Kafka 1.0.0版本压测的时候,遇到了producer生产效率不稳定的情况。具体表现为当对6个以上有3个同步副本的TopicPartition进行生产时,producer每隔不定时长生产会被阻塞一段时间,控制台报出大量的batch expiring错误,整体吞吐量因此大打折扣。
虽然0.11.x版本之后的client性能测试脚本新增了许多metrics输出,以帮助用户监控和troubleshooting,然而在不了解producer背后运作机制的情况下,仅凭metrics字面意义是难以利用好这些数据的。因此,我决定花一个周末的时间来梳理一下Kafka producer端的架构及工作原理,特别是其中的延迟保证机制,另外整理下producer各项metrics背后的含义。
4月2日更新:初步确定producer被阻塞的原因是1.0.0版本broker端的bug,详情见KAFKA-6706。
工作机制
如同大多数高吞吐量的系统设计,KafkaProducer将实际业务逻辑与IO分离以实现异步IO。Kafka producer包含了一个缓存池用于保存未发送到服务端的消息,同时后台的IO线程负责将消息转化为请求并传输至服务端。
具体produce流程如下:当KafkaProducer需要发送消息,首先会先从bootstrap servers获取最新的topic-partition信息,这个过程会阻塞生产线程,直到MetadataRequest完成。随后KafkaProducer开始做序列化和partition,将生成的消息追加到RecordAccumulator里。RecordAccumulator负责将消息收集分类形成batch,并维护消息的状态,决定什么情况下消息符合发送要求(ready),什么情况下消息会过期(expired)等等。RecordAccumulator拥有一个BufferPool实例,用于管理缓存消息的内存分配。当BufferPool耗尽内存之后,会阻塞生产线程,直至足够的内存被释放。在分配到需要的内存之后,消息会停驻在BufferPool中等待batch达到设置的batch.size
或者足够时间即linger.ms
,然后进入就绪(ready)状态。最后,后台IO线程Sender会不断检查就绪的batch,将其包装成ProduceRequest发送到broker。
延迟保证机制
因为消息队列是Kafka主要使用场景之一,提供在一定延迟内的消息送达对于用户十分重要。然而由于系统设计的问题,目前并没有一个配置来从out-of-box角度直接控制消息的延迟,取而代之的是两个主要的参数:block.max.ms
和request.timeout.ms
。其中max.block.ms
包含两部分,一是请求topic元数据时的最大阻塞时长,二是等待BufferPool分配内存的最大阻塞时长,两者之和超过阈值会导致TimeoutException。而request.timeout.ms
则有两重含义,分别是表示batch就绪后在RecordAccumulator等待的最大时长和等待ProduceRequest返回的最大时长,超过时长的请求会被认为失败,触发Producer进行重试并在重试次数耗光后返回失败。
整个消息生产的延时可以用下图(源自KIP-91)表示:
各阶段的延迟如下:
1. send阶段: 发送TopicMetadataRequest获取最新元数据,然后将消息放至发送缓存池。如上文所述,该步骤可以阻塞最多block.max.ms
。其后该记录会成为消息batch的一部分,有可能是原有的未满batch或者新的batch。
2. batching阶段: 在最坏的情况下,这个batch是新的,会在等待最多linger.ms
之后被认为是ready状态,但它还需要等待被调度才会实际发送。linger
在缓冲里通常有两种意义:一种是类似于链接的max.idle
,即超过一定时长没有更新则关闭窗口;另外一种是类似于批处理的batch window,窗口经过一定时间后一定会关闭。这里的语义是后者。
3. await-send阶段: 等待传输到broker的机会。只有当相应partition所在broker处于sendable状态时,一个batch才可以被发送(比如受到max.inflight.requests
限制时)。因为当Sender收到属于未知partition的消息时,需要阻塞线程向broker获取元数据。为了避免引入额外的超时参数,KIP-19用request.timeout.ms
配置来决定在accumulator的batch是否过期。从batch变为ready状态计时就开始,直到batch被真正发送到网络上。注意这里有个比较特殊的地方是,KIP-19的机制不会将已知的partition的batch纳入超时统计的范围,但这个机制在KAFKA-2805中被移除,所以还是可以大致将这部分的最大延时设为request.timeout.ms
加上等待Sender调度的时间。
4. inflight阶段: 消息已通过网络发送出去,producer等待请求响应以确认成功,request.timeout.ms
以后超时返回错误。
5. retry阶段: 如果上一步请求超时或者发生可重试的错误,producer会等待retry.backoff.ms
后再重试,直至retries
耗尽。
Metrics
Kafka producer的metrics十分丰富,但过于细致缺乏概括性的直观指标,而且由于metrics分布在不同层级,相互之间不免有关联或者重叠,因此需要深入了解其工作机制后再分析这些metrics才有意义。
全局通用Metrics
全局通用Metrics统计的对象是所有broker,所以为“全局”,此外“通用”指的是这些metrics除了producer,还会出现在consumer/stream/connect中。
metric | 含义 |
---|---|
outgoing-byte-rate | 每秒发至所有服务器的流出字节数 |
request-rate | 每秒发送请求数的均值 |
request-size-avg | 窗口内平均请求大小 |
request-size-max | 窗口内最大请求大小 |
incoming-byte-rate | 每秒收到所有服务器的流入字节数 |
request-latency-avg | 平均请求延时,单位为ms |
reuqest-latency-max | 最大请求延时,单位为ms |
response-rate | 每秒收到返回数的均值 |
Broker级别的通用Metrics
Broker级别的通用Metrics含义与全局通用metrics相似,不同在于是针对每个broker分开统计,因此对于分析某个节点的异常十分有用。
metric | 含义 |
---|---|
connection-creation-rate | 窗口内每秒新建连接数 |
connection-close-rate | 创建内每秒关闭连接数 |
network-io-rate | 每秒所有连接的网络平均读写次数 |
outgoing-byte-rate | 每秒发至该broker的平均字节数 |
request-rate | 每秒发送至该broker的平均请求数 |
request-size-avg | 在broker窗口内所有请求的平均大小 |
request-size-max | 在broker窗口内所有请求的最大大小 |
incoming-byte-rate | 每秒从该broker接受的平均字节数 |
response-rate | 每秒从该broker收到的平均响应数 |
select-rate | IO层每秒检查新IO请求来执行的次数 |
io-wait-time-ns-avg | IO线程等待可用socket的平均时间 |
io-wait-ratio | IO线程等待可用socket的时间比例 |
io-time-ns-avg | 每个select调用花费的平均时间,单位为纳秒 |
io-ratio | IO线程执行IO操作的时间比例 |
connection-count | 当前活跃的连接数 |
successful-authentication-rate | 成功使用SASL或SSL成功认证的连接数 |
failed-authentication-rate | 认证失败的连接数 |
Prodcuer Metrics
Producer metrics与下文的Producer Sender Metrics似乎有点重复,其实不然,这部分的metrics监控的是消息进入Accumulator之前的状态,反映的更多是BufferPool空间是否充足。
metric | 含义 |
---|---|
waiting-threads | 等待消息进入缓存的被阻塞线程数 |
buffer-total-bytes | 客户端可用的最大缓存大小(包含已使用的) |
buffer-available-bytes | 未被使用的缓存大小 |
bufferpool-wait-time | 追加记录到缓存时等待空间分配的时间比例 |
Borker级别的Producer Sender Metrics
Producer Sender作为系统底层某块,提供了更为基础也更加直接好用的metrics,包括请求错误率、请求重试率等对性能十分关键的指标。更加重要的是由于网络的不可靠性,Sender往往是最容易成为瓶颈的地方,因此这部分的metrics也最为复杂。
metric | 含义 |
---|---|
batch-size-avg | partition级别的平均batch大小 |
batch-size-max | partition级别的最大batch大小 |
batch-split-rate | 平均每秒split的batch数 |
batch-split-total | split batch总数 |
compression-rate-avg | 消息batch的平均压缩率 |
metadata-age | 当前producer使用的元数据的年龄,单位为秒 |
produce-throttle-time-avg | 每个请求被broker限流的平均时间 |
produce-throttle-time-max | 单个请求被broker限流的最大时间 |
record-error-rate | 每秒发送错误的平均记录数 |
record-error-total | 发送错误的总记录数 |
record-queue-time-avg | batch停留在发送缓存的平均时间 |
record-queue-time-max | batch停留在发送缓存的最大时间 |
record-retry-rate | 每秒平均重试记录数 |
record-retry-total | 每秒总重试记录数 |
record-send-rate | 每秒平均记录数 |
record-send-total | 发送总记录数 |
record-size-avg | 在broker窗口内所有记录的平均大小 |
record-size-max | 在broker窗口内所有记录的最大大小 |
records-per-request-avg | 每个请求包含的平均记录数 |
request-latency-avg | 平均请求延时,单位为ms |
reuqest-latency-max | 最大请求延时,单位为ms |
requestin-flight | 当前在传输中等待响应的请求数 |
Topic级别的Producer Sender Metrics
如果说broker级别的Producer Sender metrics更注重基础设施的稳定性,topic级别的metrics更关心Prodcer和业务的结合,如batch size调优或compression调优这类需要结合消息内容的优化都可以参考这部分的metrics。
metric | 含义 |
---|---|
byte-rate | 每秒发至该topic的平均字节数 |
byte-total | 发至该topic的总字节数 |
compression-rate | 发至该topic的消息batch的平均压缩率 |
record-retry-rate | 每秒发向该topic的平均重试记录数 |
record-retry-total | 每秒发向该topic的总重试记录数 |
record-error-rate | 每秒发向该topic并发送错误的平均记录数 |
record-error-total | 发向该topic并发送错误的总记录数 |
record-send-rate | 每秒发至该topic的平均记录数 |
record-send-total | 发至该topic的总记录数 |
后续改进
如你所见,目前Kafka的超时机制十分复杂并且将底层逻辑暴露给用户,并不友好。KIP-91建议引入新的超时机制,即将Producer的所有发送阶段的超时整合为一个参数,delivery.timeout.ms
。从记录进入Acccumulator开始计时,包含组装batch、等待发送、发送等待响应以及重试,直至记录成功写入或者失败,在总体时长上提供不高于delivery.timeout.ms
的保证。新的超时机制如下图:
新机制除了更为直观,也避免了之前复杂的逻辑带来的脆弱性。目前该KIP正处于开发阶段,预计会在2.0.0版本发布(听起来真是个遥不可及的时间。
参考文献
1.KIP-91: Provide Intuitive User Timeouts in The Produer
2.KIP-19: Add a request time out to NetworkClient
3.Kafka documentation
4.confluent-monitoring
5.KAFKA-1251: Add metrics to the producer