Flink 轻量级异步快照 ABS 实现原理

准确一次(exactly once)的送达保证是实时计算的关键特性之一,这要求作业从失败恢复后的状态以及管道中的数据流要和失败时一致,通常这是通过定期对作业状态和数据流进行快照实现的。然而这种方式主要有两点不足:首先,快照进行期间常常要暂停数据流的摄入,造成额外延迟和吞吐量下降;其次,快照会过度谨慎地将管道里正在计算的数据也随着状态保存下来,导致快照过于庞大。针对以上两个问题,Apache Flink(下简称 Flink)引入了异步屏障快照(Asynchronous Barrier Snapshot, ABS)。

异步屏障快照是一种轻量级的快照技术,能以低成本备份 DAG(有向无环图)或 DCG(有向有环图)计算作业的状态,这使得计算作业可以频繁进行快照并且不会对性能产生明显影响。异步屏障快照核心思想是通过屏障消息(barrier)来标记触发快照的时间点和对应的数据,从而将数据流和快照时间解耦以实现异步快照操作,同时也大大降低了对管道数据的依赖(对 DAG 类作业甚至完全不依赖),减小了随之而来的快照大小。下面将逐步分析异步屏障快照的实现原理。

计算作业模型

计算作业通常用一个图来表达,即:

1
G = (T, E)

其中,T 为作业的子任务(Flink 中又称 Operator,算子)的集合,即图的节点,E 为子任务间的数据通道的集合,即图的边。

上述是静态的作业模板,当实际执行时会生成一个作业实例,这可以用一个有状态的图来表示,即:

1
G* = (T*, E*)

其中,T 为每个算子状态的集合,E 为每个数据通道内的具体数据。

因此,对计算作业进行快照备份可以分为对算子状态的备份和对数据通道内数据的备份两部分。其中算子状态最为关键,而数据通道状态只是作为补充,然而数据通道内的数据却通常比算子的状态大得多,会导致快照操作成本很高。那么有没有可能避免备份数据通道的状态呢?答案是肯定的。其中一个可行的方法就是将作业的执行划分为多个阶段(Stage),每个阶段对应一个数据流窗口以及所有相关的计算结果。一个阶段结束时所有算子的状态反应了当前时间点的历史状态,因此可以单独被用于表达作业状态。所以如何划分阶段是重中之重,这就涉及到核心概念屏障(Barrier)。

屏障

屏障是一种特殊的内部消息,用于将数据流从时间上切分为多个窗口,每个窗口对应一系列连续的快照中的一个。屏障由 JobManager 定时广播给计算任务所有的 Source,其后伴随数据流一起流至下游。每个 barrier 是属于当前快照的数据与属于下个快照的数据的分割点。

实时数据流屏障

如图1所示,每当接收到屏障,算子便会将对当前的状态做一次快照,成功后将屏障以广播形式传给下游。最后屏障会流至作业的 Sink,Sink 接受到屏障后即向 JobManager 确认,后者收到所有 Sink 的确认标志着一个完整快照的生成。快照完成后,当前的快照窗口随之关闭,算子不会再向上游请求早于这个时间的数据,因为这些数据已经流过整个作业,被转化为作业状态的一部分。

快照时数据流校准

其中有种比较特殊的情况是一个算子接收了两个数据流,比如对它们进行 windowed join 的情况(如图2所示),这时屏障会驱使算子对两个数据流进行校准(Aligning)。如图2所示,校准的每个步骤如下:

  • 从其中一个上游数据流接收到快照屏障 barrier n 后,算子会暂停对该数据流的处理,直到从另外一个数据流也接收到 barrier n 它才会重新开始处理对应该数据流。这种机制避免了两个数据流的效率不一致,导致算子的快照窗口包含不属于当前窗口的数据。
  • 算子暂停对一个数据流的处理并不会阻塞该数据流,而是将数据暂时缓存起来,因为这些数据属于快照 n+1 ,而不是当前的快照 n。
  • 当另一个数据流的 barrier n 也抵达,算子会将所有正在处理的结果发至下游,随后做快照并发送 barrier n 到下游。
  • 最后,算子重新开始对所有数据流的处理,当然优先处理的是已经被缓存的数据。

送达语义

对多数据流进行校准会导致木桶原理,算子总体的处理效率取决于输入数据流里最慢的那个。通常校准会带来数毫秒的延迟,这在大多数情况下是可以接受的。然而一些对延迟特别敏感的应用可能比起数据的准确性更需要低延迟,因此 Flink 允许用户切换到非校准模式,但是这将会令作业的送达语义由准确一次降级到至少一次。

当校准步骤被跳过,即使收到某个输入数据流的 barrier n 算子也会继续工作。这样的话,属于第 n+1 个快照的数据也会被算到第 n 个快照窗口的算子状态里。当使用这个快照进行恢复时,这部分数据便会被重复计算,因为它们已经存在于快照的状态里,但 barrier n 以后的数据还会重流一遍。

另外,因为校准只作用于多输入或多输出的算子,对于单个输入/输出数据流的算子,即使用户将送达语义设置为至少一次,实际上还是准确一次。

异步快照

上文所述的屏障快照实际上仍是同步的,即在做快照的过程中算子会暂停处理新的数据,每次快照都会引入额外的数据延迟。实际上这是不必要的,快照可以完全交由后台异步实现,而实现这点算子必须可以提供一个状态对象,这个状态对象需要保证随后的修改不会影响当前的状态,比如 RockDB 的 copy-on-write 数据结构。完成这异步快照的对象称为 StateBackend(当然也可以是同步的,取决于具体实现),用户可以使用 Flink 内置的 StateBackend 或自定义。

当从输入数据流收到快照屏障,算子开始异步拷贝当前的状态,它会立刻将屏障传至下游,并继续常规的工作。当后台的状态拷贝完成后,算子向 JobManager 确认快照成功。因为异步拷贝存在丢失数据的风险且不能保证顺序,因此作业快照成功的条件也随之改为所有 Sink 确认收到屏障加上所有有状态的算子都成功完成快照。

状态对象

如上文所述,作业的状态可以分为算子的状态和数据通道的数据,Flink 将这两者分别称为用户定义状态和系统状态,这有趣地体现了另外一个角度的观点: 从用户角度看,算子的状态是通过用户定义的作业图计算得出,是对用户可见的,而数据通道的缓存数据则处理更低的系统层面,是用户不可见的。

由于异步屏障快照技术不依赖于数据通道的缓存状态,在大多数的情况下我们并不需要对系统状态做快照,因此状态对象通常包含以下两者:

  • 快照开始时,每个并行数据流 Source 对应的 offset 或 position。
  • 每个有状态的算子所对应的持续化状态对象的指针。

而对于包含闭环的作业图,即 Cyclic Dataflows,Flink 需要额外对算子的某些数据通道状态做快照。这是因为处于闭环内的算子会多次计算下游回流的数据(作业图内这种边称为回边,back-edges),而恢复时重放数据流并不能恢复流经回边的数据,所以算子计算过的历史回流数据也需要作为快照的一部分保存起来。

闭环数据流的快照

如图3所示,对 Flink 对闭环数据流的快照处理如下:
a) 当前快照窗口打开,正常处理输入数据流。
b) 收到来自所有数据流的屏障 n ,算子进行异步快照,在回边上的一个红色小球(循环数据)被保存到快照中。
c) 收到来自所有数据流的屏障 n+1 ,算子启动另一次异步快照。此时上个快照窗口的数据已经回流完毕,因此包含的三个红色小球均被保存到快照中。

总结

Flink 通过 ABS 技术实现轻量级异步快照,大大降低了实时计算的作业快照的成本,这使得实时作业可以更频繁地进行快照。并且在常见的 DAG 作业中,作业缓存的数据将不会被保存到快照中,这意味着作业恢复期间不需要再对缓存数据进行补算,大大减短了故障恢复时间。相信 ABS 技术将会逐渐取代目前相对笨拙的全局作业快照,被更多计算引擎选作快照算法。

参考文献

  1. Lightweight Asynchronous Snapshots for Distributed Dataflows
  2. Flink Documentation - Stream Checkpointing
  3. Unified Batch and Real-Time Stream Processing Using Apache Flink