众所周知,Apache Flink 是基于 JVM 语言开发的,所以提供的运行环境和编程 API 都是 JVM 语言(目前 Flink Python API 是用 Jython 实现的,因此也算 JVM 语言)。然而基于 JVM 开发的计算引擎普遍会遇到的一个问题是,做数据分析或机器学习的用户通常主要使用更声明式的语言,比如 Python 或者 R。因此为了支持多语言,尤其是非 JVM 语言,分布式计算领域业界在计算引擎的语言可移植性上做了不少的努力,其中比较出名的项目包括 SparkR、PySpark 和 Apache Beam。而目前 Apache Flink 社区也计划推进多语言支持,其中将优先支持 Python,下文将详细解析实现的关键点及具体方案。
业界实践经验
分布式计算引擎对语言的支持通常分为 SDK 和 UDF 两个部分: 其中 SDK 面向用户,用于构建作业逻辑;而 UDF 则面向运行时,为运行环境中实际调用的函数。相对地,实现多语言支持需要分为两个组件: 一是将提供目标语言和 JVM 之间的桥梁,令目标语言 SDK 可以调用 JVM SDK;二是提供目标语言 UDF 运行时环境及其和 JVM 交互的途径。值得注意的是,这里所说的 UDF 是与语言执行原生环境绑定的,而不是只在 JVM 里访问其他语言定义的函数。
以 SparkR 为例,其架构如下:
用户用 R 定义的作业逻辑拓扑会通过 R 与 JVM 间的语言桥梁翻译为 Java Spark SDK,即 Java Spark Context,后者与普通 Java 开发的作业一样启动 SparkExecutor,此为 SDK 部分;后续 SparkExecutor 并不是直接在进程内而是启动另外一个 R 进程来执行 UDF,此为 UDF 部分。
除计算引擎原生的支持外,Apache Beam 为计算引擎提供了另外一个更通用化的支持多语言的途径。Apache Beam 是一个统一流批处理并提供多语言支持的分布式计算框架面门(Facade),提供了计算逻辑在开发运行语言上和在计算引擎上的可移植性。Apache Beam 目前正在开发可移植性层框架,它主要由三个组件组成:
- SDK: SDK 负责提供 Java、Python 和 Go 等语言的编程接口,用于定义数据处理管道(Pipeline)及设置执行选项,比如运行的 Runner、并行度等。
- Runner: Runner 是底层负责将 Pipeline 翻译成执行引擎作业的组件。每个计算计引擎有对应的 Runner,目前已支持 Flink、Spark、Samza、Google Dataflow 等计算引擎。
- SDK harness: SDK harness 是执行用户代码(Beam Fn API, 即 UDF)的组件。SDK harness 可以以独立进程或者容器化的方式实现,环境是与 Runner 相互独立的。SDK harness 会根据执行时的二进制协议与 Runner 交互,包括如何管理 Task、何如传输数据。
不同组件间通过基于 protobuf 和 gPRC 的跨语言协议交流,以支持多种语言。
Flink 社区 Roadmap
基于上述背景,Flink 社区在支持非 JVM 语言上也分别有两种实现方案,即原生的 SDK 支持、原生的 UDF 支持或通过 Beam 来提供。其中社区可以选择其中一种或者多种都实现,关键点在于是否需要 Flink 自身的可移植性层,即 SDK 的可移植层和 UDF 的可移植层。
在 SDK 的可移植性上,Flink 社区基本认同这是必要的,因为 SDK 负责主要是定义作业逻辑拓扑和配置,比较轻量级,而且原生 SDK 作业用户入口需要尽可能完整地暴露 Flink 的功能,完全依赖 Beam 来实现是不可取的。
在 UDF 的可移植层上,社区则出现了两种观点:一种观点主要是来自同样活跃在 Beam 社区的工程师,他们认为可移植性层需要大量的工作,Beam 社区花费至少一年重构了大部分的代码才基本完成,所以比起重新发明可移植性层, Flink 社区更应该把精力用在和 Beam 的集成上;另一种观点主要来自阿里巴巴 Blink 团队的工程师,他们已经在 Blink 上实现了对 Python UDF 的支持,并认为 Flink 的可移植层可以更好地利用引擎原生的优化,所以可以在支持 Beam 的基础上也实现 Flink 本身的可移植层。
社区讨论最后并没有达成一致,不过作为进一步的探索,Flink 将首先在 Table API 上实现 Python SDK,用于满足数据分析用户的需求,不过 Python 或其他语言的 UDF 需等后续收集用户反馈再决定是否要提供。Python Table API 的设计文档见 FLIP-38 [2],虽然目前只针对 Table API,但我们可以从中窥见 Flink 对非 JVM 语言支持的发展方向。
Flink Table Python 支持
之所以首先在 Table API 支持 Python,原因是 Table API 将逐渐成为数据分析用户主要使用的 API,Python Table API 将大大提升用户友好度,而对于其他语言的需求还没有那么迫切。此外,Table API 比起底层的 DataStream/DataSet API 更为声明式,而且更符合流批处理统一的原则,并且可以在翻译成 JobGraph 时灵活应用优化规则。
目标
Python Table API 需要达成以下目标:
- 用户可以在 Python 环境使用与 Java 版本相同的 Flink Table API。
- 用户可以通过现有所有途径提交以 Python Table API 编写的作业,包括通过脚本提交(类似于
bin/flink run
),通过 REST 接口提交,以交互式 shell 的方式提交和本地 IDE 调试运行。 - 用户可以用 Python 编写 UDF、UDAF 和 UDTF。
- Pandas 函数可以用于 Python Table API。
简单来说,通过 Python Table API 用户可以不依赖 Java API 进行开发和部署,而运行时对于 Java 环境的依赖则是对用户透明的。另外,Python Table API 在首个 release 将只支持离线批处理模式和用于探索开发的交互式 shell 模式,后续再拓展至实时流处理模式。
架构
SDK
类似于 SparkR 的实现,Python SDK 相当于是在 Java SDK 基础上进行的一层封装,它通过 rpc 协议来与 Java SDK 交互,并将后者的功能完全暴露至 Python 解释器。其中 rpc 协议是实现跨语言的关键,具体来说有两种 rpc 实现方案。
一是利用已有的语言桥梁 lib,比如 Py4j。Py4j 自动负责 Java class 和 Python class 之间的映射,这个映射是通过 Python 解释器中的 stub(Python Gateway)和 JVM 中的 stub(Java Gateway Server)完成的(要避免误会的一点是,Py4j 是双向的,因此也可以是 Java Gateway 和 Python Gateway Server)。因此,在 Python SDK 中我们可以完全复用 Java Table API 的 class,开发量最少。
二是开发完整的 SDK 可移植层,每个语言的 SDK 可以独立建立 JobGraph 对象,而不是基于 Java 已有的实现。这要求将 DAG 生成和作业配置等 SDK 功能提到每个语言的 SDK 来实现,并且 JobGraph 需要重构为和具体语言无关的数据结构,用 protobuf 等序列化框架来表达。
从长远来说方案二是模块化和可拓展性更好的架构,不过出于大方向仍未完全确定的考虑,Flink 在初期将先以开发成本较低的方案一实现 Python Table API。
UDF
UDF 则是支持非 JVM 语言最为复杂的一部分,尤其是对于 Flink 这样的有状态计算来说。从设计原则来说,UDF 会运行在一个类似容器的独立环境,需要通过 RPC 的方式来和 Flink Operator 进行通信,因此如何实现 RPC 服务将 Flink Operator 的功能暴露给外部是关键点。在这个问题上,Beam 社区已经有了很好的实践经验。对于有状态计算, Beam 将对 UDF 的支持抽象为四个服务:
- Control Service: 负责 UDF 的管理,比如启停 SDC harness(运行 UDF 的环境)。
- Data Service: 负责提供 Runner 和 UDF 间的数据流传输。
- State Service: 负责提供 UDF 中 State 的管理。
- Logging Service: 负责接收并处理 UDF 的输出日志。
相比之下,Flink 提供多了一个 Metrics Service 来采集 UDF 产生的 metric。
无论是 Beam 或事 Flink,UDF 架构中 Data Service 和 State Service 涉及到计算效率和数据准确性,是最为核心的服务。更加重要的是,两者的通信管道都是双工的,因此在实现上也最为复杂。下文将结合 UDF 的类型重点说明这两个服务的架构。
Table API UDF 分类
熟悉 Hive 的同学应该知道用户定义的函数除了 UDF、还有 UDAF(User Defined Aggregate Function)和 UDTF(User Defined Table Function),不过为了方便起见都称为广义的 UDF 或者 UDX。三者主要不同在于输入行数与输出行数的关系,可以用以下的表格概括,其中 UDF、UDAF 和 UDTF 分别对应 Table API 的 ScalarFcuntion、AggregateFunction 和 TableFunction。
其中 ScalarFcuntion 和 TableFunction 输入均为一行,因此没有涉及到跨行的状态处理,属于无状态的 UDF;与之相对,AggregateFunction 以多行为输入,涉及中间状态,属于有状态的 UDF。下面分别讨论对于该两种 UDF 的实现。
无状态的 UDF
对于无状态的 UDF 来说,最为重要的服务是 DataService。DataService 负责 JVM 和 Python 解释器间的数据传输,性能极其关键并需要重点优化,这主要分为数据处理模式(Data Processing Mode)和数据传输模式(Data Transmission Mode)两个方面。
数据处理模式指的是 UDF 以同步还是异步的方式处理数据。同步的处理方式意味着 UDF 将逐条处理数据,在一条数据被处理完并输出前,Flink Operator 不会将下一条数据发送给 UDF。相对地,异步的处理方式意味着数据传输和 UDF 计算解耦,Flink Operator 可以将数据批量传输到 UDF 的输入队列缓存起来,UDF 计算的输出也可以先放入 UDF 的输出队列。
数据传输模式指的是 Flink Operator 与 UDF 间的数据传输是逐条传输还是批量传输。值得注意的是,逐条传输可以和同步数据处理和异步数据处理配合使用,批量传输则必须和异步的数据处理模式配合使用。
显而易见,性能关键的 DataService 使用异步数据处理模式加上批量传输是更合适的。总体的架构会如下:
首先 Flink Operator 将摄入的数据放到 input buffer 并发送至 Python Worker,随后该 buffer 变为 waiting buffer 仍保存原始数据;然后 Python Worker 端将 input buffer 的数据逐条交给 UDF 执行,其输出结果作为 result buffer 发回给 Flink Operator;最后 Flink Operator 将 result buffer 和 waiting buffer 合并生成最终结果并发给下游。对于为什么需要 waiting buffer,笔者猜测是 rpc 发给 Python Worker 的数据只包含了 UDF 调用的列,所以 result buffer 并不是完整数据,需要和 waiting buffer 合并才是最终结果。
有状态的 UDF
有状态的 UDF 是最为复杂的一类 UDF,因为 Python Worker 需要和 Flink Operator 同步 state 信息。
相比起无状态的 UDF,最大的不同有两点:
- 数据先执行 group 后再发至 Python Worker。这样的好处是 UDF 输入是按 key 排序的,所以方便合并 UDF 的输出。另外一点是减轻了 Python Worker 的负担,Python Worker 在同一时间只会处理一个 key 的数据,不需要维持多个 key 的状态。
- UDF 可以访问 Flink Operator 端的 state。Flink Operator 的 state 会通过 State Service 以 DataView 的形式暴露给 Python Worker,后者对 state 的操作会映射到 StateBackend。为了避免频繁的 state 访问,Python Worker 会缓存部分 state 数据,这部分 state 会在 checkpoint 快照期间同步会 Flink Operator 以确保一致性。
总结
实时计算引擎对非 JVM 语言的支持将主要分为客户端的 SDK 和运行时的 UDF 两部分,其中最为核心的部分是定义跨语言的 rpc 协议层来负责不同语言运行环境间的数据通信。对于 Flink 来说,要支持非 JVM 语言有两种途径: 通过集成 Apache Beam 或者实现原生支持。从长远来说,和 Apache Beam 集成应该没有太大疑问,而对于原生支持则尚未确定,所以社区会先通过实现较为独立的 Python Table API 来作为后续计划的探索。