自 feature freezed 以后经过近一个月的努力,Flink 社区在十一月的最后一天终于发布 Flink 1.7.0 版本。该版本处理了 420 个 issue,其中新特性或者改进主要集中在 SQL 和 State 两个模块上,另外从 1.7 开始更多的 API (包括 REST、State 还有正在讨论的 runtime)会考虑版本兼容性,以便用户更重度地依赖 Flink 做上层的开发。
下文将选取一些笔者认为重要的新特性、improvement 和 bugfix 进行解读,详细的改动请参照 release notes1。
SQL 模块
CEP 与 SQL 的集成
CEP (Complex Event Processing) 是 Flink 常见的使用场景,其 DSL 语法与 SQL 类似但仍需要以编程 API 的方式调用。因此在 SQL 里支持 CEP 将大大降低使用门槛,对于用户来说仅需学习一个新的 SQL 函数。在 2016 年年底国际标准组织(IOS)发布了 Row Pattern Recognition in SQL [3],这为在 SQL 中实现 CEP 奠定了基础。
Flink CEP 的 SQL 支持是基于 Apache Calcite 的新函数 MATCH_RECOGNIZE
实现的,具体语法见下面的案例。场景是检测某个商品最近一次连续降价的开始时间和结束时间,给定输入为:
|
|
CEP SQL 语句如下:
|
|
SQL 语句具体的执行顺序如下:
- 依据
PARTITION BY
和ORDER BY
语句将MATCH_RECOGNIZE
语句的输入数据按 Key 逻辑切分和排序。 - 用
PATTERN
语句定义模式序列,语法与正则表达式一致。 - 根据
DEFINE
语句的条件将一行数据映射为模式变量,供后续使用。 - 根据
MEASURES
语句将模式变量进行表达式计算转换为最终数据结果。
上述案例的对应输出如下,分别是(商品标识, 降价开始时间, 最低价时间, 价格回升时间):
|
|
Streaming SQL 支持 Temporal Tables 和 Temporal Joins
Temporal Table 是 Flink 1.7 版本引入的新特性,简单来说它是有时间版本的 Streaming Table,可以用于基于时间版本的 Join(Temporal Joins)。
在之前的版本,Streaming SQL 中任何对 Table 的更新都会触发 Join 结果的更新,这意味着我们无法反映历史版本数据。举个例子,我们需要将订单中的价格按实时汇率转换为本地货币,其中订单数据来自是一个只会追加(append-only)的交易订单数据流,货币汇率是依据实时更新事件的数据流。此前如果直接将两个数据流 join 起来,当汇率进行变化时,之前订单的本地货币金额也会随之更新,这显然是不符合需求的。
为了实现支持时间版本的 Join,我们需要保存其中一个(维度)表的全部时间版本,然后根据事件数据流的时间决定具体使用哪个一个版本的数值。这体现在 SQL 上新增了一个 UDF API 来定义 Temporal Table,这需要配合 LATERAL TABLE
使用。
首先需要在 Table API 注册目标表并调用 API 创建针对该表的 UDF:
|
|
然后可以在 SQL 里使用 Rates
UDF:
|
|
新增 SQL 内置函数
函数名 | 描述 |
---|---|
TO_BASE64(string) | Returns the base64-encoded result from string; returns NULL if string is NULL. |
LOG2(numeric) | Returns the base 2 logarithm of numeric. |
LTRIM(string) | Returns a string that removes the left whitespaces from string. |
RTRIM(string) | Returns a string that removes the right whitespaces from string. |
REPEAT(string, integer) | Returns a string that repeats the base string integer times. |
REPLACE(string1, string2, string3) | Returns a new string which replaces all the occurrences of string2 with string3 (non-overlapping) from string1. |
COSH(numeric) | Returns the hyperbolic cosine of NUMERIC. |
SINH(numeric) | Returns the hyperbolic sine of numeric. |
TANH(numeric) | Returns the hyperbolic tangent of numeric. |
SQL Client 优化
SQL Client 是 Flink 1.5.0 引入的实验性特性,目的是为用户提供执行探索性查询的 SQL shell,而在 Flink 1.7.0 版本中 SQL Client 得到进一步的改进。Flink SQL 一直缺乏 DDL 的实现(目前正在开发中),定义表是通过启动 SQL Client 时指定的配置文件来完成,因此创建数据源一直是个比较繁琐的操作,尤其是在探索性的场景中,用户一般需要经常变换数据源或者表格式。不过现在虽然仍不支持在 SQL Client Session 中创建表,但可以用视图(View)来缓解这个问题。SQL Client 可以在配置文件或者 Session 中创建虚拟视图。视图会立即被解析和校验,但是”懒执行”的,即知道被调用输出结果的时候才会真正被计算。
State 模块
State Evolution
对于 Stateful 作业来说 State 是极为重要的信息,用户一般会期望可以在程序升级、State Schema 变化的情况下,原有的 State 仍可以向后兼容。原先的 State Schema 变化的兼容性需要依赖序列化器本身提供,换句话说我们是不能直接将 State 迁移到一个不兼容的序列号器上的,而是需要经过以下的步骤:
- 用旧反序列化器读取 State。
- 实现一个“迁移 map 函数”将每个 State 对象转换为新序列化器格式。
- 用新序列化器将 State 写回外部存储。
State Evolution 的目的在于封装整个流程,只暴露简单接口给用户。因为步骤 2 根据不同的序列号器/反序列化器不同而不同,所以目前限制了 State Evolution 只对方便灵活改变 Schema 的 Avro 类型的 State 有效,不过之后会拓展到更多的 State 类型。
Local Recovery
Flink 1.7.0 加入了 State 的 Local Recovery 特性,即每次 checkpoint 时 TaskManager 都会本地保存一份本地的 State,当作业重启恢复时 Flink 调度器会优先考虑将 Task State 的本地性,这样可以减少网络流量提高恢复的效率,对于并行度较高的作业来说是十分重要的。
值得注意的是在 1.7.0 版本 Flink 的 failover strategy 因为重大的 bug 被暂时移除了[6]。因此基本所有的 Flink 都是全局恢复模式,一个 task 的失败会导致全部 task 的重新调度,Local Recovery 在这种场景下可以大大减少恢复时长和减缓网络压力。
修复 Cancel with Savepoint 连接过早关闭的问题
这是由于在 per-job cluster 模式下,cancel with savepoint 会导致 cluster 在取消作业后马上关闭,但这是 Flink Client 可能还处在轮询 savepoint 路径的过程中。这会导致 java.net.ConnectException
,并且 Client 端无法得到 savepoint 的路径。这也是我们在做 Flink 作业平台时遇到的一个问题,并在内部分支的 1.5.3 版本和 1.6.0 版本用 closeTimeWait
的方式来简单修复。Flink 1.7 的实现会更加优雅: 用 CompletableFuture 确保 REST Server 在处理完所有的 pending 请求后再关闭,以保证 Client 在 Cluster 关闭前可以轮询到 Savepoint 路径。详情可见 [FLINK-10309][8]
Connector 模块
Exactly-once S3 StreamingFileSink
自 Flink 1.6 引入的 StreamingFileSink 现在新增了 S3 的 exactly-once 支持。此前用户需要使用 BucketingFileSink 来写 S3,而 BucketingFileSink 并不支持 Flink FileSystem,因此许多 Flink 层面的支持都没有办法使用,而现在用户可以安全地迁移到 StreamingFileSink 上。
Kafka 2.0 Connector
Flink 1.7 将抛弃掉针对每个版本 Kafka 定制 connector 的模式,转而提供新的 Kafka connector (modern Kafka connector) 来支持 1.0+ 版本的 Kafka [7]。新 connector 由腾讯的工程师团队贡献,对不同版本的 Kafka Client 实现了一个统一的 Facade,可以自动适配不同版本的 Kafka broker。不过由于兼容性问题,1.0 以下版本的 Kafka client 并不在支持列表内,而对应的 connector 也将一直保留。
Scala 2.12 支持
Flink 1.7 全面支持了 Scala 2.12,但由于 Scala 2.12 与 Scala 2.11 的语法有不兼容,所以部分 Public API 的一致性会被破坏,迁移到新版本的 Scala 用户要注意。
参考文献
- Flink 1.7.0 Release Notes
- FLIP-20: Integration of SQL and CEP
- Row Pattern Recognition in SQL
- Enrichment joins with Table Version Functions in Flink
- State Schema Evolution
- FLINK-10712 RestartPipelinedRegionStrategy does not restore state
- Flink即将在1.7版本发布全新的Kafka连接器
- FLINK-10309 Cancel with savepoint fails with java.net.ConnectException when using the per job-mode