Executor是执行用户代码的“一线工人”(worker),理解Executor的运行机制无论对编写高效优雅的Spark Application还是对Task的troubleshooting都尤为重要。
注:吸取了broadcast包1.6版本和2.x版本差异较大的教训,我决定直接跳到今年7月发布的Spark 2.2.0版本,之后的所有笔记也会基于2.2.0版本。
Executor与Driver间的通信
ExecutorBackend为executor后台服务,它与Driver通信,管理了executor从初始化注册、运行任务到停止释放资源的整个生命周期,并向Driver报告executor和在上面执行的task的状态。除了心跳则直接向Driver报告,executor所有与Driver的通信都通过ExecutorBackend完成。
Executor
Executor是一个JVM实例,持有可以同时运行多个task attempt的工作线程池,并负责task的监控和管理。
Executor Variable
currentFiles
: Executor当前工作目录的文件。currentJars
: Executor当前classpath的jar。EMPTY_BYTE_BUFFER
: 空的byte数组,用于更新executor状态而不需要发送实际数据的情况。conf
: SparkConf实例,保存了用户设置与系统默认的Spark配置。threadPool
: 工作线程池,线程是UninterruptibleThread类型,以防被外部中断。executorSource
: 监控指标源,用于统计executor的执行情况指标。taskReaperPool
: task清理器线程池,用于cancel或者kill task。taskReaperForTask
: taskId->taskReaper的HashMap,它是同步的,以确保不会为某个task重复创建taskReaper。userClassPathFirst
: SparkConf中的”spark.executor.userClassPathFirst”配置项,用于决定executor先加载系统classpath还是用户classpath,默认false。taskReaperEnabled
: SparkConf中的”spark.task.reaper.enabled”,用于决定是否启用taskReaper特性,默认false。urlClassLoader
: 通过url加载class的ClassLoader。replClassLoader
: REPL使用的ClassLoader,会在运行时加载用户新定义的类。maxDirectResultSize
: SparkConf中的”spark.task.maxDirectResultSize”,直接发送计算结果的数据量阈值,超过这个阈值用BlockManager来发送,默认512M。maxResultSize
: executor允许的最大结果集,超过这个数量的结果集将不能返回给Driver,默认1G。runningTasks
: taskId->taskRunner的ConcurrentHashMap,维护taskId到taskRunner的映射。heartbeater
: 维持与Driver间心跳的线程池。heartbeaterReceiverRef
: Driver的心跳接收器引用,这个变量应该在开始executor心跳之前初始化。HEARTBEAT_MAX_FAILURES
: SparkConf中的”spark.executor.heartbeat.maxFailures”,最大允许的心跳连续失败次数。heartbeatFailutres
: 心跳的连续失败次数,它应该被heartThread访问,每次成功的心跳将它清零。
Executor Method
numRunningTasks
:
返回正在运行的task数目。launchTask
:
将taskDescription交给一个taskRunner,将后者注册到runningTasks并放入threadPool执行。killTask
:
根据taskId杀死task,interruptThread参数决定了是否要中断正在执行的线程. 这个函数在taskReaperEnabled为true的情况下会使用taskReaper来杀死task,否则直接从runningTasks中获取对应的taskRunner并杀死它。killAllTasks
:
杀死该executor正在运行的所有task,该方法由executorBackend调用以杀死所有task,而不是直接关停JVM。stop
:
关停executor。computeTotalGcTime
:
返回executor的总GC时间。createClassLoader
:
创建task执行时的ClassLoader,并加载用户定义类。addReplClassLoaderIfNeeded
:
为REPL创建另一个ClassLoader,在用户使用REPL时,这个ClassLoader会实时读取用户定义的新类。updateDependencies
:
当SparkContext受到新的文件或者jar时调用,从Driver下载缺少的包,并加载到ClassLoader, 通常是当SparkContext.addJar被调用的情况。reportHeartBeat
:
向Driver报告活跃task的状态,Executor会将各个task的metrics作汇总,计算更新数据量,并通过Driver RPC服务heartbeatReceiverRef报告给Driver。startDriverHeartbeater
:
启动一个任务用于报告executor心跳和部分metrics,交给heartbeater调度。
Executor.TaskRunner
TaskRunner是一个线程类,用于实际运行task,它持有ExecBackend对象可以直接向Driver报告task执行状态。
Executor.TaskRunner Variable
executorBackend
: ExecutorBackend类型,executor的后台服务类,用于向Driver报告更新task状态。taskDescription
: TaskDescription类型,任务描述。taskId
: 任务id,从taskDescription中获取taskId。taskName
: 任务名,从taskDescription获取中taskName。reasonIfKilled
: 任务被杀死的原因,该变量volatitle,只在运行时有用。threadId
: taskRunner的线程id,同样是volatitle,只在运行时有用。finished
: 该taskRunner是否已经完成。startGCTime
: task启动时JVM的GC时间。task
: 从Driver发送过来的要执行的task。
Executor.TaskRunner Method
getThreadId
:
返回threadId。isFinished
:
返回finished。kill
:
通过task的kill方法杀死task。这种kill方法属于“尽最大努力”的操作,它只是把task
的标识为killed,实际上是否起作用决定于上层的Spark代码是否还会读取这个标示位。所以更保险的方法是使用下文讲到的TaskReaper。setTaskFinishedAndClearInterruptStatus
:
将finished标识位置为true,并清理线程的中断状态。run
:
TaskRunner的核心方法,是task.run
的封装,负责了task
的上下文管理和统计信息。步骤如下:
1.初始化taskRunner
状态(threadId
、threadName
、反序列化消耗的时间、task
开始时间、cpu时间等),设置ClassLoader和序列化器。
2.初始化task
状态,更新task
依赖,反序列化task
对象。
3.执行task
,调用task.run
,每次视为task
的一个attempt。执行完释放资源,包括blockManager
上的锁以及在taskMemoryManager
上为该task attempt分配的内存。如果用户代码有报错,taskRunner会将其打印到错误日志。该步骤如果没有异常则转步骤4,否则跳到步骤5。
4.计算task metrics统计信息,并将计算结果序列化后返回给Driver,更新task
的状态为成功。
5.异常处理,根据异常模式匹配分为几种情况:FetchFailedException
: 顾名思义,该异常表示有某些block无法从其他机器上获取。这种情况下Spark会将该task中断并标记为Failed。TaskKilledException
: 检测到task已经被杀死,将该task中断并标记为Killed。这种情况在用户通过YARN等集群管理器杀死任务时很常见。InterruptedException
: Executor本身被中断,将该task中断并标记为Killed。这种情况可能发生在抢占式的资源调度池,任务占用了本不属于自己的资源,而这些资源由于新的任务进入资源池而被收回。CommitDeniedException
: 无法提交task运行结果。这种异常还没见过。Throwable
: 不属于上面任何情况的异常,这时taskRunner
会判断是否属于fatal,若是则将task
的metrics和异常信息序列化后报告给Driver,然后通过uncaughtExceptionHandler
来停止该executor。但如果恰好这时executor处于shut down中,该异常不会报告给Driver,因为有可能和用户设置的shutdown钩子冲突。
6.从
runningTasks
映射中删除该task。
Executor.TaskReaper
直译为任务收割器,实现了Runnable接口,负责杀死或者取消任务。这是从Spark 2.1.1和Spark 2.2.0开始引入的特性,用于修复任务的杀死和取消机制的不可靠问题。因为task的杀死和取消机制是被动的,task.kill
方法只是将killed标识置为true,但是具体什么时候task才停止决定于task是否interruptable以及task的运行是否会再次读取killed标识并作出反应。因此,有可能用户从监控页面看到task已经被杀死,而其实还有僵尸task一直占用该executor。TaskReaper针对上述问题提供了主动杀死task的方法:它会一直监视已经标记为killed的task,直到该task退出。另外,TaskReaper也提供超时机制,若task超过约定时间没有退出,则直接杀死JVM以释放资源。
Executor.TaskReaper Varibale
taskRunner
: 正在执行该需要杀死的task的TaskRunner实例。interruptThread
: 是否中断线程。reason
: 杀死任务的原因。taskId
: 要杀死的taskId。killPollingIntervalMs
: taskReaper轮询的时间间隔,可以通过spark.task.reaper.pollingInterval
来设置,默认10s。killTimeoutMs
: task退出的超时时长,可以通过spark.task.reaper.killTimeout
来设置,默认-1,即不设置超时。takeThreadDump
: 是否需要对task进行threadDump,可以通过spark.task.reaper.threadDump
配置,默认为true。
Executor.TaskReaper Method
run
:
不断轮询检测task状态,直到task退出。步骤如下:
- 调用taskRunner.kill常规方法来杀死task。
- 轮询检测taskRunner状态。若未完成则等待(wait)一个轮询周期,意味着在这期间任何其他对takRunner的调用都会被阻塞;若已经完成,则如果需要的话进行threadDump。
- 同步更新taskReaperForTask映射,将该taskId对应的Entry移除。
总结
Executor类只负责核心的与task运行紧密相关的操作,其他服务类的模块比如Block管理、内存管理都封装在SparkEnv中,会在用到时通过全局变量SparkEnv获取,符合高内聚低耦合的设计原则,因此可以兼容多种集群模式。Executor并未直接与其他进程通信,相关的操作是由ExecutorBackend类在后台通过RPC服务完成的。
本文是原创文章,转载请注明:时间与精神的小屋 - Spark源码学习笔记(二):Executor运行机制