Flink Event Time 倾斜

数据倾斜是分布式领域最为常见及棘手的问题之一。通常来说分布式系统会以数据的 key 来对数据进行 partition (分区),不同 key 的计算和状态都是独立,因此可以用分治的方式并行处理。当发生数据倾斜时,数据会集中到少部分的 key 上,导致这些 key 对应的 subtask 负载比其他 subtask 高得多。而在使用 event time 时间特性和 watermak 机制的实时计算系统里面,数据倾斜有了新的表现形式,即 event time 倾斜(event time skew),不同数据流的 event time 存在差异。

并行数据流中的 watermark

首先来简单回顾下 Flink 的 watermark 机制,如果读者已经比较熟悉可以跳过这一节。在存在多并行度 Source 的 Flink 作业中,每个 Soure 实例(准确来说应该是 watermark assigner)会独立产生 watermark。watermark 会以广播的形式传播到下游,下游算子的每个实例收到后更新自己的 low watermark,并将自己的 low watermark 作为新的 watermark 广播给下游。如果一个算子实例有多个输入数据流,它的 low watermark 会以最小的一个为准。

并行视图的watermak

如图 1 所示,算子实例右上角的黄色框数字表示算子实例的 low watermark,数据管道末端的黄色框数字表示该数据管道的 low watermark,数据管道中的白色框表示 (id|timestamp) 形式的数据元素,数据管道中的虚线段表示 watermark 元素。在 map 算子后面接着一个 keyBy 操作,因此下游的 window 算子的实例会接受上游多个输入数据流。

可以看到 Source(1) 的 watermark 提升得比较快已经达到 33(到达 window 算子的为29),但受限于 Source(2) 的 watermark 还在 17(到达 window 算子的为14),最下游 window 实例的 low watermark 均为 14。顺带一提,window 算子实例的 low watermark 总是相同的,因为如果是 keyed window 每个实例的 low watermark 均取上游算子实例 low watermark 最小的一个;如果是 all window 则该算子只有一个实例。

Event Time 倾斜

在读取历史数据时,比如从某个 Kafka topic 的最早 offset 开始读,各个 partition 的event time 很可能是不同的。如果有一个 partition 的 event time 明显比其他 partition 慢,那么下游所以算子的 low watermark 都会被其拖慢,而其他 partition 的新数据又会一直读入,这对于部分依赖 event time 进行计算的算子来说会造成不必要的数据缓存。

比如对于使用了低级 API ProcessingFunction 的 event time window 算子,它的触发条件是 low watermark 达到某个值,在此之前它会缓存该窗口内的所有数据。在 event time 倾斜的情况下,它一直得不到触发,但又要一直摄入新数据导致 State 不断增长。这会给内存和 checkpoint 机制带来很大的压力,使得作业的可能最终因 OOM 崩溃并且重启恢复需要的时间也异常地长。

比起传统的数据倾斜,event time 数据倾斜更加难以预料和处理。从原因来说,数据倾斜是逻辑上的分布不均匀,常常是由于业务的流量分布造成的,比如微博大 V 用户和普通用户的访问流量肯定不在一个等级,这种分布是比较容易提前预计的和稳定的;而 event time 数据倾斜通常和数据的物理存储相关,更加底层,因此原因也更动态和多样。比如 Kafka topic 扩容时新增的 partiton,从 earliest 消费得到的 event time 可能是更新的。再比如在一个公用的 topic 下,以游戏代号作为 key 写入 partition,如果某个游戏搞活动,流量在一小段时间内增加,那么这段时间这个 partition 的 event time 就比其他 partition 要慢。从处理的办法来说,数据倾斜主要是在业务层,用户可以在计算时再 re-partition 处理,而 event time 倾斜发生在数据摄入阶段,用户的控制权比较少,更多是需要计算框架层面的支持。

应对方案

Flink 社区对于 event time 倾斜问题的重要性已经达成共识,在应对方案上也有了基础的思路。最基本的思路就是,既然拉取更多的其他 partition 的数据并不能提升下游的 low watermark,为什么不减缓或者阻塞他们的消费呢?最好的情况下,Source 实例的 watermark 可以像 checkpoint 一样校准。

围绕这个思路主要有两种实现方式:

  1. 从下游算子入手,利用反压的机制来抑制对 watermark 前进过快的数据管道的摄入,比方说下游读取数据时对 watermark 更低的输入流给予更高的优先级。实际上这也是 Kafka Stream 的处理方式(见KIP-353)。这种方式比较嵌合 Flink 现有的机制,但也存在两个关键的问题。首先,减缓读取部分输入流的同时,会使得输入流中的 barrier 也被延缓,导致 checkpoint 校准产生一定的数据缓存。其次,这种方式只是尽力而为,并不保证反压到 Source 端的效果,有可能反压之后 Source 实例间的 watermark 依然有较大的差距。

  2. 直接增加一个协调者来为 Source 的 watermark 进行校准,比如 SourceCoodinator。SourceCoodinator 可以是 JobManager 的一部分,就像 CheckpointCoordinator 一样。每个 Source 实例需要定期向 SourceCoodinator 报告目前的 watermark,并接受 SourceCoodinator 的返回决定是否需要继续拉取数据。这种方式并不会影响 checkpoint 机制,准确来说应该说是建立了 checkpoint 并行的机制,不过需要对目前的 Source API 进行一定的重构。

从反馈意见来说,社区大多数是偏向用第二种方式来实现,因为第一种方式的耦合性和复杂度太高,而且对于 stateful streaming 来说任何影响到准确性的改动都应该十分谨慎。

总结

Event time 数据倾斜是基于 event time 和 watermark 机制的实时计算面临的新问题,主要表现不同 Source 实例的 watermark 不同,导致下游基于 event time 的算子需要不断缓存 watermark 较高的实例的数据。这个问题发生在数据摄入阶段,因此需要从计算框架层面提供支持。Kafka Stream 的处理办法是通过优先消费时间戳更小的输入流来平衡不同 partition 的 event time,但对于 Flink 来说会影响到 checkpoint 机制,所以 Flink 社区目前更希望通过引入 Source 实例的协调机制来解决。

参考文献

  1. Flink mail list: Sharing state between subtasks
  2. Watermarks in Parallel Streams
  3. KIP-353: Improve Kafka Streams Timestamp Synchronization