1 数据倾斜现象
1、现象
绝大多数task任务运行速度很快,但是就是有那么几个task任务运行极其缓慢,慢慢的可能就接着报内存溢出的问题。
2、原因
数据倾斜一般是发生在shuffle类的算子,比如distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup等,涉及到数据重分区,如果其中某一个key数量特别大,就发生了数据倾斜。
2 数据倾斜大key定位
从所有key中,把其中每一个key随机取出来一部分,然后进行一个百分比的推算,这是用局部取推算整体,虽然有点不准确,但是在整体概率上来说,我们只需要大概就可以定位那个最多的key了
执行:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.join.SampleKeyDemo spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
3 单表数据倾斜优化
为了减少shuffle数据量以及reduce端的压力,通常Spark SQL在map端会做一个partial aggregate(通常叫做预聚合或者偏聚合),即在shuffle前将同一分区内所属同key的记录先进行一个预结算,再将结果进行shuffle,发送到reduce端做一个汇总,类似MR的提前Combiner,所以执行计划中 HashAggregate通常成对出现。
1、适用场景
聚合类的shuffle操作,部分key数据量较大,且大key的数据分布在很多不同的切片。
2、解决逻辑
两阶段聚合(加盐局部聚合+去盐全局聚合)
3、案例演示
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.skew.SkewAggregationTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
4 Join数据倾斜优化
4.1 广播Join
1、适用场景
适用于小表join大表。小表足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。
2、解决逻辑
在小表join大表时如果产生数据倾斜,那么广播join可以直接规避掉此shuffle阶段。直接优化掉stage。并且广播join也是Spark Sql中最常用的优化方案。
3、案例演示
2.2.2中的PartitionTuning案例关闭了广播join,可以看到数据倾斜
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.skew.SkewMapJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
4.2 拆分大key 打散大表 扩容小表
1、适用场景
适用于join时出现数据倾斜。
2、解决逻辑
1)将存在倾斜的表,根据抽样结果,拆分为倾斜key(skew表)和没有倾斜key(common)的两个数据集。
2)将skew表的key全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍,得到new表)。
3)打散的skew表 join 扩容的new表
union
Common表 join old表
以下为打散大key和扩容小表的实现思路
1)打散大表:实际就是数据一进一出进行处理,对大key前拼上随机前缀实现打散
2)扩容小表:实际就是将DataFrame中每一条数据,转成一个集合,并往这个集合里循环添加10条数据,最后使用flatmap压平此集合,达到扩容的效果.
3、案例演示
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 2 --executor-memory 6g --class com.atguigu.sparktuning.skew.SkewJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
4.3 开启AQE
1)spark.sql.adaptive.skewJoin.enabled :是否开启倾斜join检测,如果开启了,那么会将倾斜的分区数据拆成多个分区,默认是开启的,但是得打开aqe。
2)spark.sql.adaptive.skewJoin.skewedPartitionFactor :默认值5,此参数用来判断分区数据量是否数据倾斜,当任务中最大数据量分区对应的数据量大于的分区中位数乘以此参数,并且也大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes参数,那么此任务是数据倾斜。
3)spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes :默认值256mb,用于判断是否数据倾斜
4)spark.sql.adaptive.advisoryPartitionSizeInBytes :此参数用来告诉spark进行拆分后推荐分区大小是多少。
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
如果同时开启了spark.sql.adaptive.coalescePartitions.enabled动态合并分区功能,那么会先合并分区,再去判断倾斜,将动态合并分区打开后,重新执行:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
修改中位数的倍数为2,重新执行:
spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-executors 3 --executor-cores 4 --executor-memory 2g --class com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin