Spark中的JOIN机制
- 1、Hash Join概述
- 2、影响JOIN的因素
- 3、Spark中的JOIN机制
- 3.1、Shuffle Hash Join
- 3.2、Broadcast Hash Join
- 3.3、Sort Merge Join
- 3.4、Cartesian Product Join
- 3.5、Broadcast Nested Loop Join
- 4、Spark中的JOIN策略
- 5、Spark JOIN机制与策略总结
- 5.1、Spark中的JOIN机制与策略总结
- 5.2、Broadcast Hash Join与Broadcast Nested Loop Join的区别
1、Hash Join概述
Apache Spark共提供了五种JOIN机制,其中常用的有三种:Shuffle Hash Join、Broadcast Hash Join及Sort Merge Join,它们都基于Hash Join,只不过需要在Hash Join前后进行Shuffle、Broadcast或Sort
实际上,Hash Join算法来自于传统数据库,而Shuffle、Broadcast和Sort是大数据(数据仓库)在分布式场景下两者结合的产物。因此,我们也说大数据(数据仓库)是由传统数据库发展而来的
通常情况下,Hash Join使用两个表中较小的表在内存中建立以Join Key为Key的哈希/散列表(Hash Table),然后扫描较大的表,同样对大表Join Key进行Hash后探测哈希/散列表,找出与哈希/散列表匹配的行
Hash Join主要分为两个阶段:建立阶段(Build Phase)和探测阶段(Probe Phase)
- Bulid Phase:较小的表被构建成以Join Key为Key的Hash Table,较小的表也称Build Table
- Probe Phase:扫描较大表的行并计算Join Key的哈希值,与Build Table哈希表比对,若相同则进行JOIN,较大的表也称Probe Table
值得注意的是,Hash Join适用于较小的表完全可以放于内存的情况,如果表较大,无法构造在内存中,则优化器会将它分成若干个Partition,将不能放入内存的部分写入磁盘,此时会多一个写的代价,I/O性能差
Apache Spark将参与JOIN的两张表抽象为流式遍历表(StreamIter)和查找表(BuildIter),通常StreamIter为大表,BuildIter为小表,这是由Spark根据JOIN策略自动决定的。对于每条来自StreamIter的记录,都要去BuildIter中查找匹配的记录
2、影响JOIN的因素
影响JOIN性能和JOIN结果的因素主要包括:数据集的大小、JOIN的连接条件以及JOIN的连接类型
1)数据集的大小
参与JOIN的数据集大小会直接影响JOIN操作的执行效率。同样,也会影响Spark JOIN机制的选择
2)JOIN的连接条件
JOIN的条件涉及字段之间的逻辑比较关系。根据JOIN的条件,JOIN可分为两大类:等值连接和非等值连接
- 等值连接:涉及字段之间一个或多个必须同时满足的相等条件(运算连接符为
=
) - 非等值连接:涉及字段之间一个或多个必须同时满足的非相等条件(运算连接符不为
=
)
3)JOIN的连接类型
在输入数据集的记录之间应用连接条件之后,JOIN类型会影响JOIN操作的结果。Spark主要有以下几种JOIN类型:
- 内连接(Inner Join):返回连接条件都匹配的记录
- 外连接(Outer Join):分为左外连接、右外链接和全外连接
- 交叉连接(Cross Join):交叉连接返回两个表的笛卡尔乘积
- 半/反连接(Semi/Anti Join):返回匹配/不匹配右表的左表数据
3、Spark中的JOIN机制
Spark共提供了五种JOIN机制来执行具体的JOIN操作:
- Shuffle Hash Join
- Broadcast Hash Join
- Sort Merge Join
- Cartesian Product Join
- Broadcast Nested Loop Join
3.1、Shuffle Hash Join
当要JOIN的表数据量较大时,一般选择Shuffle Hash Join。这样可以将大表按照Join Key进行重分区,保证每个相同的Join Key都发送到同一个分区中
Shuffle Hash Join的基本步骤如下:
- Shuffle阶段:对于两张参与JOIN的表,分别根据Join Key进行重分区,该过程会涉及Shuffle,保证Join Key值相同的记录被分到同一个分区
- Hash Join阶段:对于每个Shuffle Read分区,会将分区中较小表(BuildIter)构建成一个Hash Table放入内存,然后根据Join Key与较大的表记录进行匹配
Shuffle Hash Join的条件与特点如下:
- BuildIter的总体估计大小超过
spark.sql.autoBroadcastJoinThreshold
(default 10MB)设置的值,即不满足Broadcast Hash Join条件 - 仅支持等值连接,且参数
spark.sql.join.preferSortMergeJoin
(default true)需要设置为false
,即不满足Sort Merge Join条件(Join Key不需要排序) - 支持除了全外连接(Full Outer Join)外的所有JOIN类型,对于Full Outer Join,需要建立双向Hash Table,代价太大,因此Full Outer Join默认都是基于Sort Merge Join来实现
- 需要对小表构建Hash Table,属于内存密集型操作,如果构建Hash Table的一侧数据较大,可能会造成数据倾斜和OOM
- StreamIter的大小是BuildIter的三倍以上
Shuffle Hash Join:Shuffle会带来较多的网络I/O开销,因此性能较差。同时,在Executor的内存使用方面,如果Executor的数量足够多,每个分区处理的数据量可以控制到比较小