Flink Watermark 机制浅析

Flink 为实时计算提供了三种时间,即事件时间(event time)、摄入时间(ingestion time)和处理时间(processing time)。在进行 window 计算时,使用摄入时间或处理时间的消息都是以系统的墙上时间(wall clocks)为标准,因此事件都是按序到达的。然而如果使用更为有意义的事件时间则会需要面对乱序事件问题(out-of-order events)和迟到事件问题(late events)。针对这两个问题,Flink 主要采用了以水位线(watermark)为核心的机制来应对。

窗口与水位线

当基于事件时间的数据流进行窗口计算时,最为困难的一点是如何确定对应当前窗口的事件已尽全部到达。比如需要统计最近5分钟打开音乐播放器的用户数,服务端怎么确保聚合计算时已经收集好所有用户最近5分钟的打开播放器日志?事实上不存在能百分百准确判断的方法,因此业界常用的方法是基于已经收集的消息来估算是否还有消息未到达,这就是水位线的思想。

水位线实际上是一个时间戳,意义是早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达。这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。

水位线的计算

理想情况下水位线应该与处理时间一致,并且处理时间与事件时间只相差常数时间甚至为零,这意味着消息产生后马上被处理。然而水位线的计算总是存在一定的延迟(见图1),具体的延迟根据水位线实现的不同而也有所差别。Flink 提供了常规的定期水位线以及定制化的标点水位线两种生成水位线的方式供用户选择。


Ideal Watermark versus Actual Watermark

定期水位线

定期水位线(Periodic Watermark)按照固定时间间隔生成新的水位线,不管是否有新的消息抵达。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。

应用定期水位线需要实现AssignerWithPeriodicWatermarks API,以下是 Flink 官网提供的定期水位线的实现例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L; // 3.5 seconds
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}

其中extractTimestamp用于从消息中提取事件时间,而getCurrentWatermark用于生成新的水位线,新的水位线只有大于当前水位线才是有效的。每个窗口都会有该类的一个实例,因此可以利用实例的成员变量保存状态,比如上例中的当前最大时间戳。

标点水位线

标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。

应用标点水位线需要实现AssignerWithPunctuatedWatermarks API,以下是 Flink 官网提供的标点水位线的实现例子。

1
2
3
4
5
6
7
8
9
10
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}

其中extractTimestamp用于从消息中提取事件时间,checkAndGetNextWatermark用于检查事件是否标点事件,若是则生成新的水位线。不同于定期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就需要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。

迟到事件

虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

  1. 重新激活已经关闭的窗口并重新计算以修正结果。
  2. 将迟到事件收集起来另外处理。
  3. 将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side OutputAllowed Lateness

Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

参考文献

1.Flink 官方文档 - Event Time
2.Flink – process watermark