读源码是大数据开发工程师的自我修养之一,趁最近项目进度比较快,我决定抽时间来系统学习一下代码质量广受好评的Spark源码。虽然工作中使用的是Apache 发行版的1.6.1,所以读源码时还是保持一致。好项目都是高内聚低耦合,这就方便逐个组件剖析。先从core包开始,这次要讲的是org.apache.spark.broadcast
。
在Spark应用启动时,Driver需要将序列化之后的task发至executor上。官方建议task的大小应该在20KB以下,以保证Application可以快速启动。如果task要使用比较大的变量,比如一个维度表,这时就会用到Spark的broadcast机制。
Broadcast机制允许用户在每个executor上缓存一份只读变量,以避免将变量与task代码一起传输。从物理上看,Boardcast的目标是将变量分发到所有工作节点的BlockManager上。
深入源码,首先看下org.apache.spark.broadcast
的内容:
显然主要有三种对象Manager、Factory和Broadcast,Manager负责Broadcast的全局管理,Factory负责创建或取消Broadcast,Broadcast为实际的一次广播操作。Factory和Broadcast都有http和torrent两种实现方式。
BroadcastManager
BroadcastManager是BroadcastFactory的封装,负责了BroadcastFactory从初始化到关停的整个生命周期。初始化阶段,它从配置中读取spark.broadcast.factory
属性,这个属性决定了实际使用的broadcast是http还是torrent,默认为org.apache.spark.broadcast.TorrentBroadcastFactory
;在运行期间,它会调用BroadcastFactory的newBroadcast
方法和unbroadcast
方法来控制变量的广播,每次广播有递增的唯一ID;最后它还负责了BroadcastFactory的关闭。
BroadcastFactory
BroadcastFactory是一个trait,核心的两个方法是newBroadcast
和unbroadcast
,前者用于广播一个新的变量,后者用于删除一个已有的变量。HttpBroadcastFactory
和TorrentBroadcastFactory
只是通过简单调用Broadcast类的函数来实现这些方法.
Broadcast
Broadcast同样是一个trait,核心方法是persist
、unpersist
以及value
,分别用于缓存一个变量、从缓存删除一个变量和获取变量的值。同时它还维护了一个volatile变量_isValid
以判断当前该变量是否可操作,对变量的操作会先访问这个值。如果一个线程操作一个变量时发现它已经被删除,便会抛出一个SparkException。
HttpBroadcast
HttpBroadcast会在Driver创建一个HttpServer(底层为Jetty),并将变量写到本地临时目录以防Driver进程崩溃。Executor在接收到task后向Driver请求该变量,接收到变量之后将其反序列化并存储在该节点的BlockManager中。由于每个节点的广播变量只需要本地使用,所以这些block不需要通知BlockManagerMaster。
当Application需要删除该广播变量时,HttpBroadcast会调用unpersist
方法,通过Application的BlockManagerMaster通知所有节点的BlockManager删除对应id的广播变量。这个操作可以设置是否删除Driver的广播变量,以避免Driver后续还需要使用该变量。
此外,HttpBroadcast还会启动一个MetaDataCleaner定时清除过期的广播变量。过期的时长可以通过spark.cleaner.ttl.HTTP_BROADCAST
来设置,默认为-1表示不清理。
TorrentBroadcast
TorrentBroadcast采用比特流传输协议来分发广播变量,这意味着整个过程是P2P的,executor除了作为客户端请求变量,在获得变量之后它也会成为server帮助分发,这大大减小了分发的速度对Driver节点带宽的依赖。
首先Driver会将变量序列化并分成一个个block,存储在Driver的BlockManager。每个executor开始工作时,首先向本地的BlockManager请求该变量。如果不存在,则向Driver或其他executor请求。一旦获得了变量,则放到BlockManager上并向BlockManagerMaster报告,以便后续节点获取。
删除广播变量的实现与HttpBroadcast相似,都是通过BlockManagerMaster删除各个节点BlockManager上的相应变量。
小结
Spark变量广播分为HttpBroadcast和TorrentBroadcast。HttpBroadcast较早前用于Standalone集群模式,全部变量由Driver分发,所以有单点问题。TorrentBroadcast是更为高效的广播机制,分发能力随着接收节点增加呈指数级别增长,在大型集群中的分发效率远高于HttpTorrent。在最新版本2.2,HttpBroadcast完全被TorrentBroadcast取代。
本文是原创文章,转载请注明:时间与精神的小屋 - Spark源码学习笔记(一):Broadcast机制