首先我们观察下上面的stage5, Task MaxTime=2.4Min, 但是stage5的整体耗时竟然可以达到55Min,
其实分区1000, 300个executor, 按照最大的TaskTime=2.4Min来估算所有Task运行完成时间, 那么时间应该是- 2.4Min * 3 + 2.4Min = 9.6Min
也就是最慢也就跑10分钟就能跑完整个stage, 但却跑了整整55分钟。
这里我们通过观察executor维度指标来观察, 发现大量的task的被集中到同一个executor上面,所以问题的根本是task的分配不均匀, 虽然后期spark的推理优化kill掉部分在executor运行较慢的task, 但是kill掉不合理的task不多, 整个task的分布还是很不合理。
这里主要跟一个计算概念有关, 就是让"数据离计算越近越好", 总之就是
- memory > local file > same rack file > any file
- 内存 > 本地文件 > 同一个机架的文件 > 可能在任何地方的文件
所以Spark在task进行分配的时候, 也是秉承着这个原则, 但使用的术语不太一样, 这里就不细说了。
可以在task的维度指标的locality level来查看task使用的数据来源。
解决该问题的思路, 就是使在分配任务的时候拒绝遵循"数据离计算越近越好", 让task合理分配到不同的机器, spark.locality.wait表示在放弃并在非本地节点上启动数据本地任务之前,等待启动数据本地任务需要多长时间,默认值为3s。
当初我看得到3s的时候, 猜这个启动数据本地任务应该是一个lazy的操作, 只是把任务添加到线程池的任务队列, 不然不会产生分配不均匀, 这个观点待考究。
这里直接将spark.locality.wait=0, 重新跑了一遍该作业, 查看了stage5的运行时间由- 55Min -> 3 Min 降低了52Min
其实spark提供了更加细致的调控数据源远近的waitTime:
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
上面这三个的默认值都是spark.locality.wait