Flink 1.9 Release 解读

距离上个发行版近 4 个月后,不久前 Apache Flink 发行了 1.9 系列的首个版本。Flink 1.9 是个有重要意义的版本,它初步合并了 Blink 的大部分新特性(虽然是预览特性),其中包括 Blink planner、Hive 集成、Python Table API 和新版 Web UI。此外,1.9 版本正式引入了 Savepoint Processor 来提供离线访问和修改 State 的能力,这也是社区呼声比较高的一个特性。下文将选取一些笔者认为比较重要的特性、improvement 和 bugfix 进行解读(主要集中在实时场景),详细的变动进参考 [1]。

Blink Planner

在阿里巴巴内部,Table API 和 SQL API 是开发 Flink 应用使用得最多的 API,因此阿里巴巴也花费了大量的精力在这两个 API 的优化上,其中最重要的一个便是 Blink Planner。Planner 是 SQL/Table API 和 runtime 的桥梁,它负责将 SQL/Table API 翻译为物理执行计划,也就是 runtime 的 operator。

图一. Planner 架构

比起 Flink 原生的 Planner,Blink Planner 主要有以下的优势:

  1. 流批统一。无论是 Stream 作业还是 Batch 作业都会直接被翻译为 StreamGraph,也就是常说的将批处理作为流处理的特例。
  2. 解耦 SQL/Table API 和 DataStream/DataSet API。这与流批统一是紧密联系的,因为 DataStream/DataSet 两者是独立的,基于它们很难建立简洁的流批统一架构。
  3. 建立统一的 Table/SQL 入口,简化当前多个 *Environment 的复杂 API 设计。
  4. 更多的 SQL 优化规则,比如 Join 的谓词下推和多余的 aggregate 移除。

Blink Planner 作为预览功能在 1.9 版本发布,用户可以通过引入依赖:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.0</version>
</dependency>

并在 main 函数中设置:

1
EnvironmentSettings.useBlinkPlanner();

来启用 Blink Planner。但值得注意的是目前 Blink Planner 仍有些未解决的 issue,比如不能在同一个 TableEnvironment 执行多条 SQL 语句,所以并不推荐在生产中使用。

SQL DDL

在以往版本,Flink SQL 只提供了 DSL 和 DML,而缺少了 DDL,这意味着我们不能在 SQL Client 或者其他 Flink SQL 程序中持久化创建的表。这个问题在 1.9 版本得到改善。1.9 支持了可用于 Batch 场景的标准 SQL DDL [2],但因为 Stream 场景的 DDL 要求定义时间属性(Time Characteristic)、 Watermark 算法和 append mode 等额外的参数,需要进一步考虑实现细节,则预计在 1.10 版本再支持。尽管两种 DDL 要求的信息略有不同,但最终的目标是提供统一的语法,换句话说用户不需要区分定义的 Table 是基于无边界还有有边界的数据集,Flink 会自动根据上下文来判断。

Hive 集成

目前 Flink 主要作为实时计算引擎,在与离线数据仓库组件 Hive 的集成方面做得并不足够,但随着流批统一的大趋势,Flink 在批处理方面的潜力也会逐渐被挖掘,比如在阿里巴巴 Flink 已经被引用于大部分的批处理场景。在与 Hive 的集成方面,我们可以看到另一个主流分布式计算引擎 Apache Spark 做得很好,而 Flink 的 Hive 集成也会主要参考 SparkSQL 的特性,其中比较重要的比如打通 Flink Table/SQL API 和 Hive Metasstaore、支持 Flink 作为 Hive 的计算引擎。

整体工作会分为三个步骤来完成[3]:

  1. Flink SQL 基础集成
    这包括在 Flink SQL API 集成 Hive 的基本功能,比如通过 Hive Connector 读写 Hive 表、支持 Hive 的数据类型(Timestamp/String 等)、可以在 Flink SQL 中使用 Hive 的内置函数和在 Flink SQL 中支持 Hive 的 DDL/DML 操作。这部分工作在 1.9 版本基本得到实现,主要的功能通过 HiveCatalog 封装的形式暴露给用户。顺带一提,为了支持 HiveCatalog,原本 Table API 的 Catalag 接口也进行了大量的重构。

  2. Hive 兼容性
    这部分可以概括为充分利用 Hive 的高级特性,其中包括完整的 Hive 数据类型支持、Thrift Server(类似于 Spark SQL Thrift Server)、在 Beeline 中支持 Flink 作为计算引擎、JDBC/ODBC 驱动支持、支持多种 Hive SerDe 等等。这部分的内容较多,将在后续版本逐步实现。

  3. Flink SQL 优化
    SQL 优化是 SQL 计算引擎老生常谈的课题,在 Hive on Flink SQL 上,可预见的优化工作当然也不少。这里的内容可以细分为三个方面:SQL 优化规则、Query metric(包括资源消耗、执行时间等)以及 Flink runtime 的 Task 调度和容错。

总而言之,Flink 和 Hive 集成将极大地提升 Flink SQL 在 Batch 场景的应用能力,同时随着流批统一,尤其是 SQL 的流批统一,Streaming SQL 也将从中受益。

终止/暂停作业

目前 Flink 提供了 cancel-with-savepoint 的选项以方便用户在停止作业时持久化作业状态,在底层它会分为两步: 1. 触发 Savepoint 快照;2. Savepoint 完成后 cancel 作业。这会主要带来三个数据一致性上的问题。

首先,Flink 依靠两步提交(Tow-Phase Commit)来确保 Exactly-Once,简单来说 Operator 收到 checkpoint barrier 时进行 State 快照,等全部 Operator 都完成快照后再统一由 JobManager 通知 commit。这里的 commit 是一个 best-efford 的操作,不保证每次成功,而是依靠失败状况下的重试保证最终成功(eventually succeed)。如果一次 checkpoint 顺利完成,但某个 Operator commit 时失败了,Flink 作业会重启并从这次成功的 checkpoint 恢复,恢复完毕后再次触发 JobManager 的 checkpoint 完成通知。问题在于在 cancel-with-savepoint 场景,savepoint 完成之后 Task 立刻被取消,这很可能发生在 Operator 进行 commit 之前,导致 commit 被跳过。

其次,checkpoint/savepoint 过程并不阻止 source 摄入数据,在 cancel-with-savepoint 取消作业时作业通常会多处理一部分数据,如果使用 At-Least-Once 的 sink 则会造成数据的重复。

最后,在 event-time 窗口统计的业务场景下,窗口数据的输出依赖于 watermark 的提升,如果用户希望在停止作业时输出目前的结果,比如作业准备下线需要保存最新的计算结果,则无法简单地做到这点,也就是说停止作业时总是得不到最新数据。

FLIP-34[4] 通过重构作业停止的流程设计解决了以上的问题。第一个问题根源在于 cancel 命令应该在 Task commit 之后再执行;第二个问题根源在于 Savepoint 设计上不能阻止消费,因为 Savepoint 可能被简单用于保存某个时间点的状态,之后并不一定会 cancel 作业;第三个问题类似于第二个问题,没有区分是要临时停止还是完全下线。

针对第二、第三点,FLIP-34 引入 TERMINATESUSPEND 两种停止作业的方式,前者表示完全下线希望全部提交中间状态,后者表示临时下线,比如维护升级等。两者都会触发 Source 发出 EOS (End of Stream)的信号,令 Task 变为 FINISHED 状态(目前停止作业后 Task 是 CANCELED 状态)。此外 TERMINATE 停止还会令 Source 额外发出一个 LONG 最大值的 Watermak,这会强制触发所有基于 event time 的操作,比如 event time 的窗口统计。

图二. TERMINATE/SUSPEND 总览

针对第一点,Savepoint 由默认的异步执行改为同步执行,因此 Task 的 commit 会阻塞其他操作,包括 cancel 命令,保证 Task 的 commit 总是能保证被执行。

重构之后停止作业的流程如下:

  1. JobManager 从 Source 端开始触发同步的 Savepoint(包括 TERMINATESUSPEND)。
  2. 如果是 TERMINATE 停止,Source 会额外发出 MAX_WATERMARK。
  3. TaskManager 收到 Savepoint barrier 之后执行同步的 Savepoint 快照,这会阻塞数据处理以及其他控制命令,直到快照结束。
  4. TaskManager 向 JobManager 确认 Savepoint 成功。
  5. JobManager 确认 Savepoint 完成并通知 TaskManager 进行第二阶段的 commit。
  6. TaskManager 进行 commit,并移除阻塞状态。
  7. Source 发出 EOS 信号,接受到信号的 TaskManager 依次关闭 Task。
  8. JobManager 会在等待所有 Task 和 Job 变为 FINISHED 后关闭。

在 CLI 端,Flink 新增了 stop 命令,其用法示例如下:

1
./bin/flink stop [-p targetDirectory] [-d] <jobID>

其中 -p 指定 Savepoint 的存储路径,-d 表示 TERMINATE 类型 stop。

重构 WebUI

1.9 版本合并了 Blink 的新 WebUI。新的 WebUI 从风格来说更加明亮简洁,交互也更加友好。此外之前版本的 WebUI 对日志的支持比较糟糕,是直接拉取机器上的日志文件展示在页面上,这在日志比较大的时候很卡并容易导致标签页崩溃。

新版的 WebUI 提供分页并有一个类似 IDE 的全局缩略图,可以很容易定位到某个位置(似乎就是通过 VS Code 的库,所以也附送了语法高亮的特性)。

图三. 新 WebUI 日志页面

此外在 JobGraph 上也提供了更多的监控信息,比如 InQueue 和 OutQueue 被暴露到 Operator 上,方面用户排查作业瓶颈。

图四. 新 WebUI Operators

Failover 策略

在 1.9 以前 Flink 遇到 Task 错误的默认行为是重启整个 Job,在作业比较大的情况下可能会带来很高的 downtime 成本。针对这个问题 FLIP1[5] 提出了细粒度的容错机制,提供 RestartRegion 的 Failover 策略,使得只有与错误 Task 有数据联系的 Task 会被重启。RestartRegion 其实并不是在 1.9 版本才用,但在之前存在一个严重的 bug 导致使用 RestartRegion 并不会恢复作业状态,因此应用范围很有限。1.9 版本修复了这个问题,并且将 RestartRegion 设为默认的策略。

Runtime 稳定性

1.9 版本修复了几个 Flink runtime 比较严重的 bug,将在这里统一整理。

Per-job 集群在 job 失败后没有自动退出

相信不少 Flink 用户都遇到过的一个问题是以 detached 模式,即 per-job cluster,运行作业时,作业失败后有一定几率出现 Yarn application 仍没有退出,变成类似一个没有作业的 session cluster。根据 FLINK-12219[6] 这个问题在于 JobManager 在退出先需要将作业的归档信息持久化(给 HistoryServer 用),但这个过程没有异常处理,如果出现出错将导致 JobManager 不执行关闭的命令。

单 Task 包含多个 Stateful Operator 时 RocksDB StateBackend 会丢失数据

这个问题在于 RocksDBStateBackend 使用的本地快照目录以 VertexID 而不是 Operator 作为生成目录的参数,当多个 Stateful Operator 被 chained 到一起时它们的本地快照目录会冲突,此时 RocksDB 会前一个 Operator 的状态会被后一个覆盖,导致状态丢失。FLINK-12296[7] 通过重构 RocksDB 本地快照目录的生成规则来解决了这个问题。

已取消 Checkpoint 可能造成作业失败

当一个 checkpoint 被取消时,其 checkpoint 目录(比如常见的 HDFS 目录)会被删除,但 Task 本地仍有可能会访问这个被删除的目录,此时会抛出 org.apache.hadoop.ipc.RemoteException: java.io.IOException: Path doesn't exist 的异常,导致作业失败。FLINK-11662[8] 通过忽略已取消的 checkpoint 抛出的异常来修复了这个问题。

参考

  1. Flink 1.9 Release Changelog
  2. [FLINK-6962] Add a create table SQL DDL
  3. [FLINK-10556] Integration with Apache Hive
  4. FLIP-34: Terminate/Suspend Job with Savepoint
  5. FLIP-1 : Fine Grained Recovery from Task Failures
  6. [FLINK-12219] Yarn application can’t stop when flink job failed in per-job yarn cluster mode
  7. [FLINK-12296] Data loss silently in RocksDBStateBackend when more than one operator(has states) chained in a single task
  8. [FLINK-11662] Discarded checkpoint can cause Tasks to fail
  9. 如何在 Flink 1.9 中使用 Hive?