Join操作是数据处理中必不可少的一部分,传统关系型数据库经历多年对高效率join的研究和技术沉淀,已经非常成熟。然而在分布式环境下,数据存在不同程度的异构并缺乏索引等机制,使得传统的join优化技术难以应用。更重要的join是一个全局操作,一台物理机器需要大量网络IO获取join计算需要的记录,而网络却正是分布式计算最为珍贵的资源。以上种种原因导致分布式join与传统关系型join有着较大的差别(当然这仅仅是目前的状况,分布式计算也在不断借鉴关系型数据库的经验)。
分布式计算中的join大部分基于MapReduce计算模型,根据join操作实现的阶段可以分为reduce join和map join两大类,其余都是在该基础上进行优化。
Reduce Join
Reduce join是更为通用和常见的join模式,相对地,计算效率会比较一般,毕竟泛化能力和效率总是鱼和熊掌的关系。
Reduce join的核心思想是通过shuffle将具有相同join key的记录拉取到同一个reducer,map端只通过partitioner起到一个分类的作用,其他计算都在reduce端完成。不同于一般的MapReduce只有一个输入数据集,reduce join以两个表作为输入。Map端需要分别将数据转换为(joinKey,record)的形式输出,为了在reduce端区分两个表的记录,最好可以在record中加入一个标识字段,这个字段可以在分类完成之后丢弃。
下面以连接tbl_user(user_id, user_name)和tbl_tel(user_id, tel)两个表为例:
Reduce join的瓶颈在于需要shuffle传输所有的记录,在数据集很大的情况下这个消耗无疑是巨大的。
Map Join
Map join将join操作本地化,效率更高。但适用场景没有reduce join广,要满足两个条件:随机访问一个数据集,同时可以遍历另外一个数据集。
常见的案例是join一个大表和一个小表,可以直接将小表广播到所有map端,并以哈希表方式加载到内存。在map端即可完成所有的操作,所以是map-only作业,避免了shuffle阶段,大大提高了效率。
使用Map Join要注意的一点衡量广播小表的代价是否值得。在标准情况下,一个map处理一个block的数据,通常是64M或128M。但是如果小表有1G那么大,为了处理64M或128M的数据加载1G额外数据,可能得不偿失。
HBase Map Join
为了随机访问小表,普通的map join的策略是广播小表并加载到map端内存,但在有时广播的成本太高,而且当两个表都是大表时普通map join就有心无力了。那么有没有别的方式呢?答案当然是有,比较好的方案就是将其中一个表存储到HBase中,HBase表的row key自然就是join key了。
这样相当于为map端提供了统一的随机访问服务,而不要每个map端单独建立一个哈希表。
Hive Bucket Map Join
当数据可以提前确定join key时,Hive的bucket map join就可以派上用场了。Join的难点就在于对应key的数据是散乱分布的,要从多个的节点获取,那么如果一开始存储的时候就将两个表的数据按join key分片进行存储,map端的key对应的数据只需要从其他某个或少量几个节点批量获取,效率不就高多了?
Hive基于这个想法,在建表的时候支持通过指定clustered by (col_name,xxx ) into number_buckets buckets 关键字进行哈希分区。当两个表的join key都是clustered by的字段时,就可以通过设置参数
hive.optimize.bucketmapjoin = true
来命令hive执行bucket map join了。
这种方式其实本质上和普通的map join是一样的,只不过Hive利用了元数据优化了数据的存储分布,使得每个map的数据都是属于同一个join key,并使得拉数据变为批量操作,减少了网络IO。
参考文献
1.NickDimiduck, Amandeep Khurana. HBase实战[M]. 谢磊, 译. 北京: 人民邮电出版社, 2013.
2.张俊林. 大数据日知录: 架构与算法[M]. 北京: 电子工业出版社, 2014.
3.Hive join详解, cnblogs.
4.Edward Capriolo, Dean Warmpler, Jason Ruberglen. Hive编程指南[M]. 曹坤, 译. 北京: 人民邮电出版社, 2013.