Flink 1.7 Release 解读

Flink 1.7 Release

自 feature freezed 以后经过近一个月的努力,Flink 社区在十一月的最后一天终于发布 Flink 1.7.0 版本。该版本处理了 420 个 issue,其中新特性或者改进主要集中在 SQL 和 State 两个模块上,另外从 1.7 开始更多的 API (包括 REST、State 还有正在讨论的 runtime)会考虑版本兼容性,以便用户更重度地依赖 Flink 做上层的开发。

下文将选取一些笔者认为重要的新特性、improvement 和 bugfix 进行解读,详细的改动请参照 release notes1

SQL 模块

CEP 与 SQL 的集成

CEP (Complex Event Processing) 是 Flink 常见的使用场景,其 DSL 语法与 SQL 类似但仍需要以编程 API 的方式调用。因此在 SQL 里支持 CEP 将大大降低使用门槛,对于用户来说仅需学习一个新的 SQL 函数。在 2016 年年底国际标准组织(IOS)发布了 Row Pattern Recognition in SQL [3],这为在 SQL 中实现 CEP 奠定了基础。

Flink CEP 的 SQL 支持是基于 Apache Calcite 的新函数 MATCH_RECOGNIZE 实现的,具体语法见下面的案例。场景是检测某个商品最近一次连续降价的开始时间和结束时间,给定输入为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 输入
symbol rowtime price tax
====== ==================== ======= =======
'ACME' '01-Apr-11 10:00:00' 12 1
'ACME' '01-Apr-11 10:00:01' 17 2
'ACME' '01-Apr-11 10:00:02' 19 1
'ACME' '01-Apr-11 10:00:03' 21 3
'ACME' '01-Apr-11 10:00:04' 25 2
'ACME' '01-Apr-11 10:00:05' 18 1
'ACME' '01-Apr-11 10:00:06' 15 1
'ACME' '01-Apr-11 10:00:07' 14 2
'ACME' '01-Apr-11 10:00:08' 24 2
'ACME' '01-Apr-11 10:00:09' 25 2
'ACME' '01-Apr-11 10:00:10' 19 1

CEP SQL 语句如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SELECT *
FROM Ticker
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY rowtime
MEASURES
START_ROW.rowtime AS start_tstamp,
LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
LAST(PRICE_UP.rowtime) AS end_tstamp
ONE ROW PER MATCH
AFTER MATCH SKIP TO LAST PRICE_UP
PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)
DEFINE
PRICE_DOWN AS
(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
PRICE_UP AS
PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
) MR;

SQL 语句具体的执行顺序如下:

  1. 依据 PARTITION BYORDER BY 语句将 MATCH_RECOGNIZE 语句的输入数据按 Key 逻辑切分和排序。
  2. PATTERN 语句定义模式序列,语法与正则表达式一致。
  3. 根据 DEFINE 语句的条件将一行数据映射为模式变量,供后续使用。
  4. 根据 MEASURES 语句将模式变量进行表达式计算转换为最终数据结果。

上述案例的对应输出如下,分别是(商品标识, 降价开始时间, 最低价时间, 价格回升时间):

1
2
3
4
// 输出
symbol start_tstamp bottom_tstamp end_tstamp
========= ================== ================== ==================
ACME 01-APR-11 10:00:04 01-APR-11 10:00:07 01-APR-11 10:00:08

Streaming SQL 支持 Temporal Tables 和 Temporal Joins

Temporal Table 是 Flink 1.7 版本引入的新特性,简单来说它是有时间版本的 Streaming Table,可以用于基于时间版本的 Join(Temporal Joins)。

在之前的版本,Streaming SQL 中任何对 Table 的更新都会触发 Join 结果的更新,这意味着我们无法反映历史版本数据。举个例子,我们需要将订单中的价格按实时汇率转换为本地货币,其中订单数据来自是一个只会追加(append-only)的交易订单数据流,货币汇率是依据实时更新事件的数据流。此前如果直接将两个数据流 join 起来,当汇率进行变化时,之前订单的本地货币金额也会随之更新,这显然是不符合需求的。

为了实现支持时间版本的 Join,我们需要保存其中一个(维度)表的全部时间版本,然后根据事件数据流的时间决定具体使用哪个一个版本的数值。这体现在 SQL 上新增了一个 UDF API 来定义 Temporal Table,这需要配合 LATERAL TABLE 使用。

首先需要在 Table API 注册目标表并调用 API 创建针对该表的 UDF:

1
2
3
4
5
6
7
8
9
10
// 创建汇率历史表
DataStream<Tuple2<String, Long>> ratesHistoryStream = env.fromCollection(ratesHistoryData);
Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime");
tEnv.registerTable("RatesHistory", ratesHistory);
// 创建并注册该表的 temporal table function
// 将 `r_proctime` 作为时间字段,`r_currency` 作为主键
TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1)
tEnv.registerFunction("Rates", rates);

然后可以在 SQL 里使用 Rates UDF:

1
2
3
4
5
SELECT
SUM(o.amount * r.rate) AS amount
FROM Orders AS o
LATERAL JOIN (TABLE Rates(o.rowtime)) AS r
ON r.currency = o.currency;

新增 SQL 内置函数

函数名 描述
TO_BASE64(string) Returns the base64-encoded result from string; returns NULL if string is NULL.
LOG2(numeric) Returns the base 2 logarithm of numeric.
LTRIM(string) Returns a string that removes the left whitespaces from string.
RTRIM(string) Returns a string that removes the right whitespaces from string.
REPEAT(string, integer) Returns a string that repeats the base string integer times.
REPLACE(string1, string2, string3) Returns a new string which replaces all the occurrences of string2 with string3 (non-overlapping) from string1.
COSH(numeric) Returns the hyperbolic cosine of NUMERIC.
SINH(numeric) Returns the hyperbolic sine of numeric.
TANH(numeric) Returns the hyperbolic tangent of numeric.

SQL Client 优化

SQL Client 是 Flink 1.5.0 引入的实验性特性,目的是为用户提供执行探索性查询的 SQL shell,而在 Flink 1.7.0 版本中 SQL Client 得到进一步的改进。Flink SQL 一直缺乏 DDL 的实现(目前正在开发中),定义表是通过启动 SQL Client 时指定的配置文件来完成,因此创建数据源一直是个比较繁琐的操作,尤其是在探索性的场景中,用户一般需要经常变换数据源或者表格式。不过现在虽然仍不支持在 SQL Client Session 中创建表,但可以用视图(View)来缓解这个问题。SQL Client 可以在配置文件或者 Session 中创建虚拟视图。视图会立即被解析和校验,但是”懒执行”的,即知道被调用输出结果的时候才会真正被计算。

State 模块

State Evolution

对于 Stateful 作业来说 State 是极为重要的信息,用户一般会期望可以在程序升级、State Schema 变化的情况下,原有的 State 仍可以向后兼容。原先的 State Schema 变化的兼容性需要依赖序列化器本身提供,换句话说我们是不能直接将 State 迁移到一个不兼容的序列号器上的,而是需要经过以下的步骤:

  1. 用旧反序列化器读取 State。
  2. 实现一个“迁移 map 函数”将每个 State 对象转换为新序列化器格式。
  3. 用新序列化器将 State 写回外部存储。

State Evolution 的目的在于封装整个流程,只暴露简单接口给用户。因为步骤 2 根据不同的序列号器/反序列化器不同而不同,所以目前限制了 State Evolution 只对方便灵活改变 Schema 的 Avro 类型的 State 有效,不过之后会拓展到更多的 State 类型。

Local Recovery

Flink 1.7.0 加入了 State 的 Local Recovery 特性,即每次 checkpoint 时 TaskManager 都会本地保存一份本地的 State,当作业重启恢复时 Flink 调度器会优先考虑将 Task State 的本地性,这样可以减少网络流量提高恢复的效率,对于并行度较高的作业来说是十分重要的。

值得注意的是在 1.7.0 版本 Flink 的 failover strategy 因为重大的 bug 被暂时移除了[6]。因此基本所有的 Flink 都是全局恢复模式,一个 task 的失败会导致全部 task 的重新调度,Local Recovery 在这种场景下可以大大减少恢复时长和减缓网络压力。

修复 Cancel with Savepoint 连接过早关闭的问题

这是由于在 per-job cluster 模式下,cancel with savepoint 会导致 cluster 在取消作业后马上关闭,但这是 Flink Client 可能还处在轮询 savepoint 路径的过程中。这会导致 java.net.ConnectException,并且 Client 端无法得到 savepoint 的路径。这也是我们在做 Flink 作业平台时遇到的一个问题,并在内部分支的 1.5.3 版本和 1.6.0 版本用 closeTimeWait 的方式来简单修复。Flink 1.7 的实现会更加优雅: 用 CompletableFuture 确保 REST Server 在处理完所有的 pending 请求后再关闭,以保证 Client 在 Cluster 关闭前可以轮询到 Savepoint 路径。详情可见 [FLINK-10309][8]

Connector 模块

Exactly-once S3 StreamingFileSink

自 Flink 1.6 引入的 StreamingFileSink 现在新增了 S3 的 exactly-once 支持。此前用户需要使用 BucketingFileSink 来写 S3,而 BucketingFileSink 并不支持 Flink FileSystem,因此许多 Flink 层面的支持都没有办法使用,而现在用户可以安全地迁移到 StreamingFileSink 上。

Kafka 2.0 Connector

Flink 1.7 将抛弃掉针对每个版本 Kafka 定制 connector 的模式,转而提供新的 Kafka connector (modern Kafka connector) 来支持 1.0+ 版本的 Kafka [7]。新 connector 由腾讯的工程师团队贡献,对不同版本的 Kafka Client 实现了一个统一的 Facade,可以自动适配不同版本的 Kafka broker。不过由于兼容性问题,1.0 以下版本的 Kafka client 并不在支持列表内,而对应的 connector 也将一直保留。

Scala 2.12 支持

Flink 1.7 全面支持了 Scala 2.12,但由于 Scala 2.12 与 Scala 2.11 的语法有不兼容,所以部分 Public API 的一致性会被破坏,迁移到新版本的 Scala 用户要注意。

参考文献

  1. Flink 1.7.0 Release Notes
  2. FLIP-20: Integration of SQL and CEP
  3. Row Pattern Recognition in SQL
  4. Enrichment joins with Table Version Functions in Flink
  5. State Schema Evolution
  6. FLINK-10712 RestartPipelinedRegionStrategy does not restore state
  7. Flink即将在1.7版本发布全新的Kafka连接器
  8. FLINK-10309 Cancel with savepoint fails with java.net.ConnectException when using the per job-mode