相信不少读者在开发 Flink 应用时或多或少会遇到在内存调优方面的问题,比如在我们生产环境中遇到最多的 TaskManager 在容器化环境下占用超出容器限制的内存而被 YARN/Mesos kill 掉[1],再比如使用 heap-based StateBackend 情况下 State 过大导致 GC 频繁影响吞吐。这些问题对于不熟悉 Flink 内存管理的用户来说十分难以排查,而且 Flink 晦涩难懂的内存配置参数更是让用户望而却步,结果是往往将内存调大至一个比较浪费的阈值以尽量避免内存问题。
对于作业规模不大的普通用户而言,这些通常在可以接受的范围之内,但对于上千并行度的大作业来说,浪费资源的总量会非常可观,而且进程的不稳定性导致的作业恢复时间也会比普通作业长得多,因此阿里巴巴的 Blink 团队针对内存管理机制做了大量的优化,并于近期开始合并到 Flink。本文的内容主要基于阿里团队工程师宋辛童在 Flink Forward Beijing 的分享[1],以及后续相关的几个 FLIP 提案。
Flink 目前(1.9)的内存管理
TaskManager 作为 Master/Slave 架构中的 Slave 提供了作业执行需要的环境和资源,最为重要而且复杂,因此 Flink 的内存管理也主要指 TaskManager 的内存管理。
TaskManager 的资源(主要是内存)分为三个层级,分别是最粗粒度的进程级(TaskManager 进程本身),线程级(TaskManager 的 slot)和 SubTask 级(多个 SubTask 共用一个 slot)。
在进程级,TaskManager 将内存划分为以下几块:
- Heap Memory: 由 JVM 直接管理的 heap 内存,留给用户代码以及没有显式内存管理的 Flink 系统活动使用(比如 StateBackend、ResourceManager 的元数据管理等)。
- Network Memory: 用于网络传输(比如 shuffle、broadcast)的内存 Buffer 池,属于 Direct Memory 并由 Flink 管理。
- Cutoff Memory: 在容器化环境下进程使用的物理内存有上限,需要预留一部分内存给 JVM 本身,比如线程栈内存、class 等元数据内存、GC 内存等。
- Managed Memory: 由 Flink Memory Manager 直接管理的内存,是数据在 Operator 内部的物理表示。Managed Memory 可以被配置为 on-heap 或者 off-heap (direct memory)的,off-heap 的 Managed Memory 将有效减小 JVM heap 的大小并减轻 GC 负担。目前 Managed Memory 只用于 Batch 类型的作业,需要缓存数据的操作比如 hash join、sort 等都依赖于它。
根据 Managed Memory 是 on-heap 或 off-heap 的不同,TaskManager 的进程内存与 JVM 内存分区关系分别如下:
在线程级别,TaskManager 会将其资源均分为若干个 slot (在 YARN/Mesos/K8s 环境通常是每个 TaskManager 只包含 1 个 slot),没有 slot sharing 的情况下每个 slot 可以运行一个 SubTask 线程。除了 Managed Memory,属于同一 TaskManager 的 slot 之间基本是没有资源隔离的,包括 Heap Memory、Network Buffer、Cutoff Memory 都是共享的。所以目前 slot 主要的用处是限制一个 TaskManager 的 SubTask 数。
从作为资源提供者的 TaskManager 角度看, slot 是资源的最小单位,但从使用者 SubTask 的角度看,slot 的资源还可以被细分,因为 Flink 的 slot sharing 机制。默认情况下, Flink 允许多个 SubTask 共用一个 slot 的资源,前提是这些 SubTask 属于同一个 Job 的不同 Task。以官网的例子来说,一个拓扑为 Source(6)-map(6)-keyby/window/apply(6)-sink(1)
的作业,可以运行在 2 个 slot 数为 3 的 TaskManager 上(见图3)。
这样的好处是,原本一共需要 19 个 slot 的作业,现在只需要作业中与 Task 最大并行度相等的 slot, 即 6 个 slot 即可运行起来。此外因为不同 Task 通常有不同的资源需求,比如 source 主要使用网络 IO,而 map 可能主要需要 cpu,将不同 Task 的 subtask 放到同一 slot 中有利于资源的充分利用。
可以看到,目前 Flink 的内存管理是比较粗粒度的,资源隔离并不是很完整,而且在不同部署模式下(Standalone/YARN/Mesos/K8s)或不同计算模式下(Streaming/Batch)的内存分配也不太一致,为深度平台化及大规模应用增添了难度。
Flink 1.10 细粒度的资源管理
为了改进 Flink 内存管理机制,阿里巴巴的工程师结合 Blink 的优化经验分别就进程、线程、SubTask(Operator)三个层面分别提出了 3 个 FLIP,均以 1.10 为目标 release 版本。下面将逐一介绍每个提案的内容。
FLIP-49: 统一 TaskExecutor 的内存配置
背景
TaskExecutor 在不同部署模式下具体负责作业执行的进程,可以简单视为 TaskManager。目前 TaskManager 的内存配置存在不一致以及不够直观的问题,具体有以下几点:
- 流批作业内容配置不一致。Managed Memory 只覆盖 DataSet API,而 DataStream API 的则主要使用 JVM 的 heap 内存,相比前者需要更多的调优参数且内存消耗更难把控。
- RocksDB 占用的 native 内存并不在内存管理里,导致使用 RocksDB 时内存需要很多手动调优。
- 不同部署模式下,Flink 内存计算算法不同,并且令人难以理解。
针对这些问题,FLIP-49[4] 提议通过将 Managed Memory 的用途拓展至 DataStream 以解决这个问题。DataStream 中主要占用内存的是 StateBackend,它可以从管理 Managed Memory 的 MemoryManager 预留部分内存或分配内存。通过这种方式同一个 Flink 配置可以运行 Batch 作业和 Streaming 作业,有利于流批统一。
改进思路
总结一下,目前 DataStream 和 DataSet 的内存使用可以分为如下几类:
场景 | 内存类型 | 内存分配方式 | 内存限制 |
---|---|---|---|
Streaming(Heap-based StateBackend) | heap | 隐式分配 | 小于 JVM heap size |
Streaming(RocksDB StateBackend) | off-heap | 隐式分配 | 只受限于机器内存 |
Batch | heap 或 off-heap | 显式通过 MemoryManager 分配 | 不大于显式分配的内存数 |
可以看到目前 DataStream 作业的内存分配没有经过 MemoryManager 而是直接向 JVM 申请,容易造成 heap OOM 或者物理内存占用过大[3],因此直接的修复办法是让 MemoryManager 了解到 StateBackend 的内存占用。这会有两种方式,一是直接通过 MemoryManager 申请内存,二是仍使用隐式分配的办法,但需要通知 MemoryManager 预留这部分内存。此外 MemoryManager 申请 off-heap 的方式也会有所变化,从 ByteBuffer#allocateDirect()
变为 Unsafe#allocateMemory()
,这样的好处是显式管理的 off-heap 内存可以从 JVM 的 -XX:MaxDirectMemorySize
参数限制中分离出来。
另外 MemoryManager 将不只可以被配置为 heap/off-heap,而是分别拥有对应的内存池。这样的好处是在同一个集群可以运行要求不同类型内存的作业,比如一个 FsStateBackend 的 DataStream 作业和一个 RocksDBStateBackend 的 DataStream 作业。heap/off-heap 的比例可以通过参数配置,1/0 则代表了完全的 on-heap 或者 off-heap。
改进之后 TaskManager 的各内存分区如下:
分区 | 内存类型 | 描述 | 配置项 | 默认值 |
---|---|---|---|---|
Framework Heap Memory | heap | Flink 框架消耗的 heap 内存 | taskmanager.memory. framework.heap |
128mb |
Task Heap Memory | heap | 用户代码使用的 heap 内存 | taskmanager.memory. task.heap |
无 |
Task Off-Heap Memory | off-heap | 用户代码使用的 off-heap 内存 | taskmanager.memory. task.offheap |
0b |
Shuffle Memory | off-heap | 网络传输/suffle 使用的内存 | taskmanager.memory. shuffle.[min/max/fraction] |
min=64mb, max=1gb, fraction=0.1 |
Managed Heap Memory | heap | Managed Memory 使用的 heap 内存 | taskmanager.memory. managed.[size/fraction] |
fraction=0.5 |
Managed Off-heap Memory | off-heap | Managed Memory 使用的 off-heap 内存 | taskmanager.memory. managed.offheap-fraction |
0.0 |
JVM Metaspace | off-heap | JVM metaspace 使用的 off-heap 内存 | taskmanager.memory.jvm-metaspace | 192mb |
JVM Overhead | off-heap | JVM 本身使用的内存 | taskmanager.memory.jvm-overhead.[min/max/fraction] | min=128mb, max=1gb, fraction=0.1) |
Total Flink Memory | heap & off-heap | Flink 框架使用的总内存,是以上除 JVM Metaspace 和 JVM Overhead 以外所有分区的总和 | taskmanager.memory.total-flink.size | 无 |
Total Process Memory | heap & off-heap | 进程使用的总内存,是所有分区的总和,包括 JVM Metaspace 和 JVM Overhead | taskmanager.memory.total-process.size | 无 |
值得注意的是有 3 个分区是没有默认值的,包括 Framework Heap Memory、Total Flink Memory 和 Total Process Memory,它们是决定总内存的最关键参数,三者分别满足不同部署模式的需要。比如在 Standalone 默认下,用户可以配置 Framework Heap Memory 来限制用户代码使用的 heap 内存;而在 YARN 部署模式下,用户可以通过配置 YARN container 的资源来间接设置 Total Process Memory。
FLIP-56: 动态 slot 分配
背景
目前 Flink 的资源是预先静态分配的,也就是说 TaskManager 进程启动后 slot 的数目和每个 slot 的资源数都是固定的而且不能改变,这些 slot 的生命周期和 TaskManager 是相同的。Flink Job 后续只能向 TaskManager 申请和释放这些 slot,而没有对 slot 资源数的话语权。
这种粗粒度的资源分配假定每个 SubTask 的资源需求都是大致相等的,优点是较为简单易用,缺点在于如果出现 SubTask 的资源需求有倾斜的情况,用户则需要按其中某个 SubTask 最大资源来配置总体资源,导致资源浪费且不利于多个作业复用相同 Flink 集群。
改进思路
FLIP-56[5] 提议通过将 TaskManager 的资源改为动态申请来解决这个问题。TaskManager 启动的时候只需要确定资源池大小,然后在有具体的 Flink Job 申请资源时再按需动态分配 slot。Flink Job 申请 slot 时需要附上资源需求,TaskManager 会根据该需求来确定 slot 资源。
值得注意的是,slot 资源需求可以是 unknown
。提案引入了一个新的默认 slot 资源要求配置项,它表示一个 slot 占总资源的比例。如果 slot 资源未知,TaskManager 将按照该比例切分出 slot 资源。为了保持和现有静态 slot 模型的兼容性,如果该配置项没有被配置,TaskManager 会根据 slot 数目均等分资源生成 slot。
目前而言,该 FLIP 主要涉及到 Managed Memory 资源,TaskManager 的其他资源比如 JVM heap 还是多个 slot 共享的。
FLIP-53: 细粒度的算子资源管理
背景
FLIP-56 使得 slot 的资源可以根据实际需求确定,而 FLIP-53 则探讨了 Operator (算子)层面如何表达资源需求,以及如何根据不同 Operator 的设置来计算出总的 slot 资源。
目前 DataSet API 以及有可以指定 Operator 资源占比的方法(TaskConfig 和 ChainedDriver),因此这个 FLIP 只涉及到 DataStream API 和 Table/SQL API (先在 Blink Planner 实现)。不过提案并没有包括用户函数 API 上的变化(类似新增 dataStream.setResourceSpec()
函数),而是主要讨论 DataStream 到 StreamGraph 的翻译过程如何计算 slot 资源。改进完成后,这三个 API 的资源计算逻辑在底层会是统一的。
改进思路
要理解 Flink 内部如何划分资源,首先要对 Flink 如何编译用户代码并部署到分布式环境的过程有一定的了解。
以 DataStream API 为例,用户为 DataStream 新增 Operator 时,Flink 在底层会将以一个对应的 Transform 来封装。比如 dataStream.map(new MyMapFunc())
会新增一个 OneInputTransformation
实例,里面包括了序列化的 MyMapFunc
实例,以及 Operator 的配置(包括名称、uid、并行度和资源等),并且记录了它在拓扑中的前一个 Transformation 作为它的数据输入。
当 env.execute()
被调用时,在 client 端 StreamGraphGenerator 首先会遍历 Transformation 列表构造出 StreamGraph 对象(每个 Operator 对应一个 StreamNode),然后 StreamingJobGraphGenerator 再将 StreamGraph 翻译成 DataStream/DataSet/Table/SQL 通用的 JobGraph(此时会应用 chaining policy 将可以合并的 Operator 合并为 OperatorChain,每个 OperatorChain 或不能合并的 Operator 对应一个 JobVertex),并将其传给 JobManager。
JobManager 收到 JobGraph 后首先会将其翻译成表示运行状态的 ExecutionGraph,ExecutionGraph 的每个节点称为 ExecutionJobVertex,对应一个 JobVertex。ExecutionJobVertex 有一个或多个并行度且可能被调度和执行多次,其中一个并行度的一次执行称为 Execution,JobManager 的 Scheduler 会为每个 Execution 分配 slot。
细粒度的算子资源管理将以下面的方式作用于目前的流程:
- 用户使用 API 构建的 Operator(以 Transformation 表示)会附带
ResourceSpecs
,描述该 Operator 需要的资源,默认为unknown
。 - 当生成 JobGraph 的时候,StreamingJobGraphGenerator 根据
ResourceSpecs
计算出每个 Operator 占的资源比例(主要是 Managed Memory 的比例)。 - 进行调度的时候,Operator 的资源将被加总成为 Task 的
ResourceProfiles
(包括 Managed Memory 和根据 Task 总资源算出的 Network Memory)。这些 Task 会被划分为 SubTask 实例被部署到 TaskManager 上。 - 当 TaskManager 启动 SubTask 的时候,会根据各 Operator 的资源占比划分 Slot Managed Memory。划分的方式可以是用户指定每个 Operator 的资源占比,或者默认均等分。
值得注意的是,Scheduler 的调度有分 EAGER 模式和 LAZY_FROM_SOURCE 两种模式,分别用于 Stream 作业和 Batch 作业,它们会影响到 slot 的资源计算。Stream 类型的作业要求所有的 Operator 同时运行,因此资源的需求是急切的(EAGER);而 Batch 类型的作业可以划分为多个阶段,不同阶段的 Operator 不需要同时运行,可以等输入数据准备好了再分配资源(LAZY_FROM_SOURCE)。这样的差异导致如果要充分利用 slot,Batch 作业需要区分不同阶段的 Task,同一时间只考虑一个阶段的 Task 资源。
解决的方案是将 slot sharing 的机制拓展至 Batch 作业。默认情况下 Stream 作业的所有 Operator 都属于 default sharing group,所以全部 Operator 都能共用都一个 slot。对于 Batch 作业而言,我们将整个 JobGraph 根据 suffle 划分为一至多个 Region,每个 Region 属于独立的 sharing group,因而不会被放到同一个 slot 里面。
总结
随着 Flink 的越来越大规模地被应用于各种业务,目前资源管理机制的灵活性、易用性不足的问题越发凸显,新的细粒度资源管理机制将大大缓解这个问题。此外,新资源管理机制将统一流批两者在 runtime 层资源管理,这也为将最终的流批统一打下基础。对于普通用户而言,这里的大多数变动是透明的,主要的影响应该是出现新的内存相关的配置项需要了解一下。
1.[FLINK-13477] Containerized TaskManager killed because of lack of memory overhead
2.机遇与挑战:Apache Flink 资源管理机制解读与展望
3.[FLINK-7289] Memory allocation of RocksDB can be problematic in container environments
4.FLIP-49: Unified Memory Configuration for TaskExecutors
5.FLIP-56: Dynamic Slot Allocation
6.FLIP-53: Fine Grained Operator Resource Management