数据本地性
object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Valuetype TaskLocality = Valuedef isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {condition <= constraint}
}
- PROCESS_LOCAL:
要处理的数据在同一个本地进程,
即数据和Task在同一个Excutor JVM中。
这种情况是RDD的数据经过缓存,此时不需要网络传输,是最优locality。(但是数据要先缓存)。
- NODE_LOCAL:
(1)数据和Task在同一节点上的不同executor中;
(2)数据HDFS和Task在同一个结点上,
此时需要进行进程间进行传输,速度比PROCESS_LOCAL略慢。
- NO_PREF:
数据从哪访问都一样,相当于没有数据本地性,一般值从外部数据源读取数据。
- RACK_LOCAL:
数据与Task在同机架的不同节点,此时需要通过网络传输,速度比NODE_LOCAL慢。
- ANY:
数据和Task可能在集群的任何地方,性能最差,一般出现这种情况就该排查原因了
TaskSetManager
TaskSetManager.scala
/** Add a task to all the pending-task lists that it should be on. */private[spark] def addPendingTask(index: Int,resolveRacks: Boolean = true,speculatable: Boolean = false): Unit = {// A zombie TaskSetManager may reach here while handling failed task.if (isZombie) returnval pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasksfor (loc <- tasks(index).preferredLocations) {loc match {case e: ExecutorCacheTaskLocation =>pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += indexcase e: HDFSCacheTaskLocation =>val exe = sched.getExecutorsAliveOnHost(loc.host)exe match {case Some(set) =>for (e <- set) {pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index}logInfo(s"Pending task $index has a cached location at ${e.host} " +", where there are executors " + set.mkString(","))case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +", but there are no executors alive there.")}case _ =>}pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += indexif (resolveRacks) {sched.getRackForHost(loc.host).foreach { rack =>pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index}}}if (tasks(index).preferredLocations == Nil) {pendingTaskSetToAddTo.noPrefs += index}pendingTaskSetToAddTo.all += index}
TaskLocation
TaskLocation.scala
/*** A location that includes both a host and an executor id on that host.*/
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)extends TaskLocation {override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}/*** A location on a host.*/
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {override def toString: String = host
}/*** A location on a host that is cached by HDFS.*/
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {override def toString: String = TaskLocation.inMemoryLocationTag + host
}
private[spark] object TaskLocation {// We identify hosts on which the block is cached with this prefix. Because this prefix contains// underscores, which are not legal characters in hostnames, there should be no potential for// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.val inMemoryLocationTag = "hdfs_cache_"// Identify locations of executors with this prefix.val executorLocationTag = "executor_"
按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的顺序进行调度task。
getCacheLocs
DAGScheduler.scala
private[scheduler]def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = rdd.stateLock.synchronized {cacheLocs.synchronized {// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) timesif (!cacheLocs.contains(rdd.id)) {// Note: if the storage level is NONE, we don't need to get locations from block manager.val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {IndexedSeq.fill(rdd.partitions.length)(Nil)} else {val blockIds =rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]blockManagerMaster.getLocations(blockIds).map { bms =>bms.map(bm => TaskLocation(bm.host, bm.executorId))}}cacheLocs(rdd.id) = locs}cacheLocs(rdd.id)}}