网易游戏 FlinkSQL 平台化实践

随着近年来流式 SQL 理论逐渐完善,在实时流计算场景中的提供与离线批计算类似的 SQL 开发体验成为可能,GCP Dataflow、Apache Flink、Apache Kafka、Apache Pulsar 都纷纷推出 SQL 支持。在开源领域中,Flink SQL 毫无疑问是流式 SQL 领域最为流行的框架之一,但由于 Flink SQL 缺乏类似 Hive Server2 的服务端组件,各大厂对 Flink SQL 平台化的实现方案各不相同,而本文将介绍在网易游戏在 Flink SQL 平台化上的探索和实践。

发展历程

Flink SQL 的平台化与实时计算平台的架构密不可分,下文将简单介绍网易游戏实时计算平台的发展历程。

网易游戏实时计算平台 Streamfly 取名自电影《驯龙高手》中的 Stormfly,由于显然我们已经从 Storm 迁移到 Flink,所以将 Stormfly 中的 Storm 替换成了更为通用的 Stream。

图1. Streamfly 发展历程

Streamfly 建立于 2019 年,前身是离线作业平台 Omega 下的名为 Lambda 的子系统。Lambda 作为实时作业平台,在设计之初支持 Storm、Spark Streaming 和 Flink 三种实时计算框架。出于松耦合设计和公司技术栈的考虑,Lambda 以 Golang 作为开发语言,并采用与 YARN 类似的动态生成 shell 脚本的方式来调用不同框架的命令行接口。这样松耦合的接口方式给我们带来很大的灵活性,比如我们可以轻松支持多个版本的 Flink,不需要强制用户随着系统版本升级,但同时也为后续的 Flink SQL 平台化方案埋下了伏笔。

在 2019 年底的时候,我们对 Flink SQL 平台化做了第一次的探索,即 StreamflySQL v1。考虑到 Streamfly 本身是 Golang 所写,无法调用 Flink client 的本地,而且当时 Flink Client 接口仍不太适合平台集成[1],于是我们决定使用通用模版 jar 加上包含 SQL 的作业配置的方式来实现 Flink SQL 平台化。

然而,由于基于 jar 的方式带来的用户体验问题以及当时 Flink SQL 特性还不够完善,StreamflySQL v1 上线以后并未获得用户的青睐,大多数用户在调研之后仍继续使用 jar 的方式来开发和管理 Flink 应用。因此,我们在 2020 年年底对 Flink SQL 进行了新一轮调研,对 StreamflySQL 进行了重构,即 StreamflySQL v2。

StreamflySQL v2 弃用了通用 jar 的方式,而是在原有 Lambda 作业平台之上新建了 Flink SQL 平台服务端,提供类似 Hive Server2 或者 Kyuubi 的纯 SQL 服务,极大地提升了用户体验。

StreamflySQL v1(基于模板 jar)

实现方案

如上文所述,StreamflySQL v1 使用基于通用模版 jar 加配置的方式来实现 Flink SQL 作业,主要包含三个模块: Flink 模板 jar、作为配置中心的后端和提供 SQL 编辑器交互的前端。总体架构如下图所示(为简洁只画出请求的路径,省略了返回的路径)。

图2. Streamfly v1 架构

一个 Flink SQL 作业的提交流程为:

  1. 用户打开 SQL 编辑器,前端请求元数据,包括 Catalog、Database、Table 等。
  2. 后端转发请求给对应数据源的元数据中心(网易游戏采用分散式的元数据管理,即各组件管理自己的元数据并提供 REST API)。
  3. 用户根据元数据写好 SQL,设置内存、并行度等作业运行配置,提交作业。
  4. 后端对 SQL 和配置进行检查,调用 Lambda API 基于预上传的模版 jar 创建并启动作业。
  5. Lambda 执行 flink run 命令,启动 Flink client 进程。
  6. Flink Client 加载并执行模板 jar 的 main 函数,其中会注册多个 Catalog,并通过这些 Catalog 访问元数据。
  7. Client 进程完成 SQL 的解析、优化,以 per-job 模式提交 YARN application 和 Flink JobGraph。

对于 Lambda 实时作业平台而言,Flink SQL 作业与其他作业无异,除了作业新建是由 StreamflySQL 后端自动生成以外,其他都运维管理都可以直接在 Lambda 上操作。事实上,除了 StreamflySQL v1,网易游戏还有很多基于 Flink 的服务都是以模板 jar 加动态作业配置的架构实现的。

痛点

基于模板 jar 的方式可以较为简单地实现 Flink SQL 的需求,但距离像传统 RDBMS 或者 Hive 的 SQL 终端还有很大的差距,最主要的痛点有以下几个。

1. 响应慢

对于每个 SQL 作业,StreamflySQL v1 都需要启动一个 Flink Client 进程并提交一个 YARN application,其中 JVM 启动、上传依赖到 HDFS 和等待 YARN 分配 container 来启动 jobmanager 都需要比较长的时间,总体下来通常要 1-2 分钟。尤其对于一些复杂的 SQL,由于 SQL 优化的时间较长,总体的时间可能需要 5 分钟以上。

虽然 Lambda 中 Flink Client 的执行部分是异步的,但用户仍需要等到作业顺利在集群上成功跑起来或者报错退出才可以确认最终提交结果,这样的响应时间对于需要多次调整 SQL 的用户来说是十分影响效率的。

2. 调试难

新开发 Flink SQL 作业通常需要一个调试阶段,而调试的基本需求是:

  • 调试的 SQL 和最终的线上 SQL 保持一致
  • 不能影响对线上的数据产生影响
  • 能方便快捷地获取到执行结果

针对调试需求,StreamflySQL v1 使用替换 Sink 方式来对于数据进行隔离,即提供一个调试的选项,如果开启则在 Flink SQL 翻译 JobGraph 时将原有的 Sink 替换为一个 PrintSink,并且用本地启动 Flink MiniCluster 执行而不是提交到 YARN 集群。PrintSink 会将输出打印以特定格式到标准输出里(并提供限流功能),而日志将在作业结束或者超时后(调试作业最多执行 15 分钟)被一并返回给 StreamflySQL 服务端。服务端会将其中属于输出结果的部分从日志提取出来,返回前端展示。

图3. Streamfly v1 调试实现方案

比起正式执行,调试采用本地执行省去了初始化远程执行环境(YARN application)的时间,同时本地进程更方便采集标准输出。这样的实现方案对于简单的作业而言是可行的,然而缺点也很明显:

  • 对于比较复杂的作业,SQL 优化可能会占用大部分调试时间导致超时,并给 Lambda 服务端造成比较大的压力。
  • 无法调试时间窗口较长的作业或者需要 Bootstrap State 的作业。
  • 执行结果需要等作业结束时一并返回,而不能流式返回,因此用户仍需要等 10 分钟以上。
  • 对 Flink Table 模块入侵比较多,完全不优雅。

3. 只能执行单条 DML

由于 StreamflySQL v1 只支持作业类型的 SQL 语句,所以只能执行形如 insert into ... select ... 的 DML 语句,无法执行 DDL(比如 create table)、DSL(比如 select) 或 DCL(比如 grant)。而实际上,如果要专门创建一个 Flink Environment 去执行一条 DDL 等语句也明显 overkilled,况且许多 DDL 只对当前 Environment 生效,比如 set 语句。这导致 SQL 编辑器变得有些空有其表,实际能支持的操作十分有限,根本原因是 Environment 生命周期与 Flink Client 进程绑定,而缺乏常驻的 Environment。

此外,由于当时 Flink SQL 还不支持 StatementSet 的多条 SQL 执行,所以 DML 也被限制为一条,这很大程度上限制了批处理用户的使用(虽然目前在 Streamfly 上使用 Flink 做批处理的用户并不多)。

StreamflySQL v2(基于 SQL Gateway)

实现方案

由于 StreamflySQL v1 的种种问题,加上社区出现更多可以借鉴的 Flink SQL 落地经验,在 2020 年底我们对 Flink SQL 平台化方案进行了新一轮的调研,并最终选择了基于 Ververica 的 Flink SQL Gateway[3] 进行新的 Flink SQL 平台开发。

Flink SQL Gateway 是一个类似 Spark Thrift Server 的应用,提供基于 REST API 的 SQL 接口,但只是一个原型,不具备生产级别的特性。针对于此,我们对其进行了多项改进(下文会逐项解释),并集成到 SpringBoot 应用里,即 StreamflySQL v2 服务端(为简单起见,下文 StreamflySQL 默认指 v2 版本)。

其中有个比较关键的问题是,StreamflySQL(或者说 SQL Gateway)有和 Lambda 一样的提交作业能力,那么该二者间的关系是如何?如果 StreamflySQL 绕过 Lambda 提交作业,那么相当于有两个独立入口,认证授权、监控告警、计费、审计等通用功能都需要重复建设,而且非常不利于统一管理。

经过研究,我们最终定下的方案是: 利用 Flink Session Cluster 的资源和作业分离的特性来对两个系统进行分工。具体而言,Lambda 需要新增 Session Cluster 的作业类型,而 StreamflySQL 首先调用 Lambda 新建 Session Cluster,此后再直接和 Session Cluster 交互,包括 SQL 提交和作业管理等。这样的好处是能复用 Lambda 的大部分能力,Lambda 仍然作为运维管理的唯一入口。

图4. Streamfly v2 架构

新的架构下,用户执行 SQL 的流程如下:

  1. 首先初始化 SQL 会话,若已有则跳转至步骤 5。
  2. 后端创建 Lambda Session Cluster 类型作业并启动。
  3. Lambda 执行 yarn-session.sh 启动 Flink client 进程。
  4. Flink client 提交 YARN application 初始化 Flink Session Cluster。
  5. 用户提交 SQL。
  6. 后端解析 SQL,判断是会生成 Flink 作业 DML/DSL 则执行步骤 7,否则直接通过 Catalog 执行并返回结果。
  7. 后端完成 SQL 优化和翻译,编译 Flink JobGraph 提交至 Flink Session Cluster。

新版 Streamfly 大大改善了用户体验,获得不错的效果,但开发过程中并不是一帆风顺,下文将分享我们遇到的主要挑战和解决方案。

挑战及解决方案

1. 元数据持久化

社区的 Flink SQL Gateway 在生产中应用的最大难点在于会话、作业的元数据并没有持久化,这意味着如果进程重启,所以元数据都会丢失。上文有提到,我们将 Flink SQL Gateway 集成到 SpringBoot 项目里,因此很自然地将 SQL Gateway 作为封装一个 Service,并将元数据存储到数据库。本地的 Flink Environment 会作为缓存,若不存在则自动从数据库重建。

此外,在 SQL Gateway 原本会在启动时加载 Flink Configuration 且会用于创建所有会话,然而在实际场景中,不同会话会有不同的配置,最典型的便是 cluster ID(on-YARN 环境下即 YARN application ID)。因此我们提供了运行时的配置覆盖功能,即为每个会话存储优先级更高的配置项,在初始化会话创建 Environment 时,系统会合并从 FLINK_CONF_DIR 加载而来的默认配置和数据库存储的配置项。

2. 多租户(认证/资源)

网易游戏大部分组件都使用 Kerberos 认证,而认证是 SQL Gateway 原先并不具备的。更加关键的是,由于 StreamflySQL 是通用平台,必需支持多租户的能力。用过 Hadoop 生态 Kerberos 集成的同学应该了解,这并不是一件容易的事。主要原因是 Hadoop 提供的 Kerberos 接口 UserGroupInformation(下简称 UGI)的很多状态是 static 的,这意味着认证是 JVM 级别的。

摆在我们面前的选项有两个: 一是分别用自定义 Classloader 包住每个会话(底层是一个 Flink Environment),因此各自的 UGI 是隔离的;二是利用 Hadoop 的 proxy user 特性,将 StreamflySQL 设置为超级用户,并伪装(impersonate)成代理的用户。

因为 SQL Gateway 本来就有会话级别的 UserCodeClassloader,所以一开始我们尝试了 Classloader 隔离的方案。然而由于 UGI 的使用散落在各个组件 lib 的代码中,要完全将 Hadoop 生态相关的调用模块化难度较大,后续我们转为了 proxy user 的方案。具体来说,系统先登录为超级用户,然后替代理用户获取不同组件的 delegation token,最后伪装为代理用户以 delegation token 而不是 Kerberos TGT 来进行认证。

另外在多租户的资源隔离方面,我们底层通过 Lambda 在用户自己的队列启动 Session Cluster,因此每个用户的集群资源天然就是隔离的,避免了类似 Spark Thrift Server 只能使用公用队列而导致资源混用的问题。

3. 水平拓展

上文提到我们将主要的状态存储到数据库,StreamflySQL 服务端基本是无状态的,因此可以方便地水平拓展。水平拓展能力对于 StreamflySQL 尤为重要,除单点问题以外,Flink SQL 优化编译有可能打满 CPU 单核,单实例的资源显然是不够的。然而如果同一个会话的 SQL 请求随机被分发到多个不同的实例,则会导致每个服务实例都需要初始化一个 Environment,浪费资源并导致响应时间显著增长。更加重要的是,会话某些状态是不合适持久化的,比如 select DSL 会让 StreamflySQL 服务端开启一个 TCP 链接接收从集群作业回传的结果数据集,如果同个会话的请求被路由到其他实例,显然无法读取到结果。

为此我们采用了亲和性的负载均衡策略,基于会话 ID 进行路由,确保同一个会话的请求尽可能路由到同一个实例。当然这只确保大部分情况下正常,如果服务实例挂掉或者维护重启,那么 select 的结果还是会丢失。但考虑到 select 一般用于调试或数据探索,所以是可以接受的范围内。

4. 作业状态管理

社区的 SQL Gateway 未考虑作业的 SQL 状态,而这在生产环境是不可缺失的。对于普通基于 jar 的作业而言,Lambda 在作业启动时会默认搜索最近的一个成功的 Checkpoint 或 Savepoint 用于恢复,具体策略如下:

  1. 若上一次执行最后用 stop/canel with savepoint 方式停止并成功留下 Savepoint 则用该 Savepoint 恢复。
  2. 若上次执行未留下成功 Savepoint,则从该作业所有执行的 Savepoint/Checkpoint 路径查找,取修改时间最近一个用于作业恢复。注意这里查找范围不是上次执行而是全部执行,原因是在作业变更出现异常需要回滚的情况下,目标的 Checkpoint 可能是在更早的几次执行的目录里。

然而对于 StreamflySQL 来说,却不能直接复用 Lambda 这个功能,一方面是因为 Lambda 只管 Session Cluster 的健康状态,对里面的 Flink SQL 作业并不知情,另一方面是 Lambda 的搜索策略基于 Per-Job 模式每个集群只有一个 Flink Job 的假设,否则有可能会搜索到别的 Job 的 Checkpoint。

针对这个问题 StreamflySQL 实现了类似 Flink History Server 的基于 JobManager archive 的查找策略,即通过 Flink Job ID 找到已完成作业的最后状态信息,提取其中 Checkpoint 列表,获取其中最新完成的 Checkpoint。若无已完成 Checkpoint,则取 Restored Checkpoint 用于恢复状态。

未来展望

目前 StreamflySQL v2 虽然基本达到了预期的效果,但仍在计划中的事项还有非常多,其中比较重要的有:

  1. Flink SQL 作业的状态迁移(State Migration),即用户对 SQL 进行变更后,如何从原先的 Savepoint 进行恢复。这点据笔者了解业界暂时没有很好的办法,只能通过变更类型来告知用户风险,比如通常而言加减字段不会造成 Savepoint 的不兼容,但如果新增一个 join 表,造成的影响就很难说了。因此后续 StreamflySQL 可能会加入默认开启的执行计划分析,来告知用户变更前后的状态兼容性,如果需要的话,可能允许用户强制覆盖自动生成 Operator ID。
  2. Flink SQL 作业目前仍未支持细粒度的资源管理,用户只能通过作业级别的并行度和会话级别的 TaskManager 内存设置来控制资源,这对于在同一会话中运行多个作业的场景不太友好。后续希望可以参与并推动社区在 Table/SQL API 细粒度资源配置方面的进度。
  3. StreamflySQL 在 SQL Gateway 上进行了不少的改进,其中比较通用的 commit 希望可以推回给社区,推动 FLIP-91[4] 的进展。

参考

  1. FLIP-74: Flink JobClient API
  2. Flink SQL 1.11 on Zeppelin 平台化实践
  3. Flink SQL Gateway
  4. FLIP-91: Support SQL Client Gateway