一、概览
从start-all.sh开始捋,一直捋到Master、Worker的启动并建立通信
二、宏观描述
Master端
1、start-all.sh调用start-master.sh启动Master
2、执行org.apache.spark.deploy.master.Master中main方法
3、通过工厂模式创建RpcEnv子类NettyRpcEnv
a、创建TransportServer
b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)
c、Dispatcher将Master作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef
d、创建一个端点传递消息的MessageLoop,并将这个MessageLoop和Master绑定
e、New一个带有待处理消息的收件箱(Inbox)同时也和Master绑定
f、Inbox中有一个代码块会把OnStart方法添加到消息列表
g、调用setActive(inbox)将其标记为活动以处理OnStart消息
4、Master中的OnStart的方法被调起
5、处理来自Workers的注册信息RegisterWorker,向Workers发送注册成功信息RegisterWorkerResponse
5、检查并删除超时的Worker
Worker端
1、start-all.sh调用start-workers.sh脚本
2、start-workers.sh去workers文件中查询节点ip或host依次ssh过去启动start-worker.sh脚本
3、各个节点执行org.apache.spark.deploy.worker.Worker中main方法
4、通过工厂模式创建RpcEnv子类NettyRpcEnv
a、创建TransportServer
b、初始化TransportServer(通过ServerBootstrap去引导Netty并并绑定端口)
c、Dispatcher将Master作为一个RpcEndpoint注册到RpcEnv中并返回RpcEndpointRef
d、创建一个端点传递消息的MessageLoop,并将这个MessageLoop和Master绑定
e、New一个带有待处理消息的收件箱(Inbox)同时也和Master绑定
f、Inbox中有一个代码块会把OnStart方法添加到消息列表
g、调用setActive(inbox)将其标记为活动以处理OnStart消息
5、Worker中的OnStart的方法被调起
6、根据配置看是否启动外部ShuffleService
7、获取资源信息
8、根据Master的URL从RpcEnv中获取MasterEndpointRef
9、向所有的Master(可能会是HA的场景)发送注册信息RegisterWorker
10、接收到来自Master的RegisterWorkerResponse信息
11、向Master发送心跳Heartbeat
三、源码调用绘图
把图下载下来,放大可以看清楚
四、核心源码
脚本部分
1、start-all.sh
#启动所有spark守护进程
#在此节点上启动Master
#在conf/workers中指定的每个节点上启动一个worker# Start Master
"${SPARK_HOME}/sbin"/start-master.sh# Start Workers
"${SPARK_HOME}/sbin"/start-workers.sh
2、start-master.sh
CLASS="org.apache.spark.deploy.master.Master"
#默认端口
SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
#获取Master节点的域名
$SPARK_MASTER_HOST#启动Master守护进程
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \$ORIGINAL_ARGS
3、start-workers.sh
#Master节点默认端口
SPARK_MASTER_PORT=7077
#获取Master节点的域名
$SPARK_MASTER_HOST
#启动所有的worker
"${SPARK_HOME}/sbin/workers.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-worker.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
4、start-worker.sh
CLASS="org.apache.spark.deploy.worker.Worker"#启动几个Worker实例
$SPARK_WORKER_INSTANCES
#第一个worker的端口号。如果设置,后续的workers将递增此数字。如果未设置,Spark将找到一个有效的端口号,但不能保证模式是可预测的。
$SPARK_WORKER_PORT
#第一个worker的web界面的基本端口。后续的workers将增加这个数字。默认值为8081。
$SPARK_WORKER_WEBUI_PORT
#启动worker
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
scala代码部分
1、Master
//Master 是一个多线程安全的RpcEndpoint
private[deploy] class Master(override val rpcEnv: RpcEnv,address: RpcAddress,webUiPort: Int,val securityMgr: SecurityManager,val conf: SparkConf)extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {private val forwardMessageThread =ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")//Master端有所有worker的信息 并且有它们的 Rpc通信地址val workers = new HashSet[WorkerInfo]private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]override def onStart(): Unit = {logInfo("Starting Spark master at " + masterUrl)logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")webUi = new MasterWebUI(this, webUiPort)webUi.bind()masterWebUiUrl = webUi.webUrl//检测所有Worker的超时任务checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },0, workerTimeoutMs, TimeUnit.MILLISECONDS)}override def onStop(): Unit = {...}//HA下的Leader选举override def electedLeader(): Unit = {self.send(ElectedLeader)}//接收其他 Endpoint 的 send 消息override def receive: PartialFunction[Any, Unit] = {case ElectedLeader => Leader选举case RegisterWorker(...)=> worker 此时会携带它的资源信息 xx核 xx 内存master 会添加这个 worker 并向这个 worker send RegisterWorkerResponse 消息case RegisterApplication(description, driver) => derver端的应用注册case Heartbeat(workerId, worker) => 接收worker端的心跳case CheckForWorkerTimeOut => 检测worker的心跳......}//接收其他 Endpoint 的 ask 消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case RequestSubmitDriver(description) => 提交 driver case RequestKillDriver(driverId) => 杀掉 drivercase RequestDriverStatus(driverId) => driver 状态case RequestMasterState => master 状态case RequestExecutors(appId, requestedTotal) => 启动 executorscase KillExecutors(appId, executorIds) => 杀掉 executors......}//在workers上启动executors //返回一个数组,其中包含分配给每个worker的核心数。//有两种启动executors 的模式。//第一种方法试图将应用程序的executors 分散到尽可能多的worker进程上, 更适合数据局部性,是默认设置//第二种方法则相反(即在尽可能少的worker进程中启动它们)。//分配给每个executor的核心数量是可配置的。当显式设置此选项时,如果工作进程有足够的内核和内存,则可以在同一worker进程上启动来自同一应用程序的多个executor。否则,//默认情况下,每个executor都会抓取worker上可用的所有内核,在这种情况下,在一次单独的计划迭代中,每个应用程序只能在每个worker上启动一个executor。//请注意,当未设置“spark.executor.cores”时,我们仍然可以在同一个worker上从同一个应用程序启动多个executor。//假设appA和appB都有一个executor在worker1上运行,并且appA.cores>0,则appB完成并释放worker1上的所有内核,因此对于下一个计划迭代,appA启动一个新的executor,抓取worker1上所有空闲的内核,因此我们从运行在worker1的appA中获得多个executor。private def scheduleExecutorsOnWorkers(...){......}//在workers上启动executorsprivate def startExecutorsOnWorkers(): Unit = {.......}//分配worker上的资源给 1个 或多个 executorprivate def allocateWorkerResourceToExecutors(){.......}//在等待的应用程序之间安排当前可用的资源。每当有新应用加入或资源可用性发生变化时,都会调用此方法。private def schedule(): Unit = {.......}......}private[deploy] object Master extends Logging {val SYSTEM_NAME = "sparkMaster"val ENDPOINT_NAME = "Master"def main(argStrings: Array[String]): Unit = {val args = new MasterArguments(argStrings, conf)//启动Master 并返回一个 tuple 3val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(...){//启动一个RpcEnvval rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)//将Master Endpoint 注册到 RpcEnv 中val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}}
2、Worker
//Worker 是一个多线程安全的RpcEndpoint
private[deploy] class Worker(...){//Worker 端也有 Master 端的RpcEndpointRefprivate var master: Option[RpcEndpointRef] = Noneprivate var activeMasterUrl: String = ""//该Worker上有哪些 app driver executor val drivers = new HashMap[String, DriverRunner]val executors = new HashMap[String, ExecutorRunner]val appDirectories = new HashMap[String, Seq[String]]override def onStart(): Unit = {//根据配置启动外部ShuffleServicestartExternalShuffleService()//设置资源setupWorkerResources()//向所有Master注册自己registerWithMaster()......}//使用个新的Masterprivate def changeMaster(masterRef: RpcEndpointRef, uiUrl: String,masterAddress: RpcAddress): Unit = {......}//向所有的Master注册private def tryRegisterAllMasters(): Array[JFuture[_]] = {......}//向Master发送注册消息private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = {......}//处理Master发送的注册响应private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {msg match {case RegisteredWorker(...) => 注册成功case RegisterWorkerFailed(message) => 注册失败case MasterInStandby => Master 还没准备好}}//处理来自其他 Endpoint 的 send 消息override def receive: PartialFunction[Any, Unit] = synchronized {.......}//处理来自其他 Endpoint 的 ask 消息override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {.......}//向当前主机发送消息。如果尚未在任何主机上成功注册,则消息将被丢弃。private def sendToMaster(message: Any): Unit = {}......}private[deploy] object Worker extends Logging {val SYSTEM_NAME = "sparkWorker"val ENDPOINT_NAME = "Worker"def main(argStrings: Array[String]): Unit = {val args = new WorkerArguments(argStrings, conf)//启动 RpcEnv 并将自己放进去val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,args.memory, args.masters, args.workDir, conf = conf,resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))......rpcEnv.awaitTermination()}def startRpcEnvAndEndpoint(......){//一个节点可以启动多个worker 默认是1个val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr, resourceFileOpt))rpcEnv}}
3、NettyRpcEnv
//父类是RpcEnv
//RPC环境。[[RpcEndpoint]]需要在[[RpcEnv]]中注册一个名称来接收消息。
//然后[[RpcEnv]]将处理从[[RpcEndpointRef]]或远程节点发送的消息,并将其传递给相应的[[RpcEndpoint]]s。
//对于[[RpcEN]]捕获的未捕获异常,[[Rpcen]]将使用[[RpcCallContext.sendFailure]]将异常发送回发送方,或者在没有此类发送方或“NoterializableException”的情况下记录它们。
//[[RpcEnv]]还提供了一些方法来检索[[RpcEndpointRef]]的给定名称或uri
//它是各种角色通信的基础,是基于Netty的
private[netty] class NettyRpcEnv(...) extends RpcEnv {//分发器private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)//基于Netty 其初始化时会通过bootstrap引导启动Netty@volatile private var server: TransportServer = _//[[RpcAddress]]和[[发件箱]]的map。当我们连接到远程[[RpcAddress]]时,我们只需将消息放入其[[Outbox]]即可实现非阻塞的“send”方法。private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()//启动这个NettyRpcEnvdef startServer(bindAddress: String, port: Int): Unit = {val bootstraps: java.util.List[TransportServerBootstrap] =if (securityManager.isAuthenticationEnabled()) {java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))} else {java.util.Collections.emptyList()}//创建一个将尝试绑定到特定主机和端口的Netty服务器server = transportContext.createServer(bindAddress, port, bootstraps)//注册一个RpcEndpointVerifier 用于给远程RpcEndpoint查询是否存在对应的RpcEndpointdispatcher.registerRpcEndpoint(RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))}//设置一个RpcEndpointoverride def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {dispatcher.registerRpcEndpoint(name, endpoint)}//往Outbox中放信息(且指明了收件人)private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {......}//向远程 RPC endpoint 发送信息 其实就是放到 Outbox中 private[netty] def send(message: RequestMessage): Unit = {......}//异步发送消息,设置了超时时间,需要等待响应private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {......}......
}//RpcEndpointRef的NettyRpcEnv版本。
//此类的行为因创建位置而异。在“拥有”RpcEndpoint的节点上,它是一个围绕RpcEndpointAddress实例的简单包装器。
//在接收引用序列化版本的其他机器上,行为会发生变化。实例将跟踪发送引用的TransportClient,以便通过客户端连接向端点发送消息,而不需要打开新的连接。
//此引用的RpcAddress可以为空;这意味着ref只能通过客户端连接使用,因为承载端点的进程不监听传入连接。这些引用不应与第三方共享,因为它们将无法向端点发送消息。
private[netty] class NettyRpcEndpointRef(...){.......
}//从发送方发送到接收方的消息。
private[netty] class RequestMessage(...){.......
}//将传入的RPC分派到已注册的端点。
//处理程序跟踪与其通信的所有客户端实例,以便RpcEnv在向客户端端点(即不监听传入连接,而是需要通过客户端Socket联系的端点)发送RPC时知道要使用哪个“TransportClient”实例。
//事件是按每个连接发送的,因此,如果客户端打开多个到RpcEnv的连接,将为该客户端创建多个连接/断开连接事件(尽管具有不同的“RpcAddress”信息)。
private[netty] class NettyRpcHandler(...}.......
}//通过工厂创建NettyRpcEnv
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {def create(config: RpcEnvConfig): RpcEnv = {......}
}
4、TransportServer
public class TransportServer implements Closeable {//Netty服务端的引导,用于创建ServerChannelprivate final List<TransportServerBootstrap> bootstraps;private ServerBootstrap bootstrap;//构造器public TransportServer(...){...init(hostToBind, portToBind);...}//初始化该TransportServer private void init(String hostToBind, int portToBind) {//IO模式 NIO 、 EPOLL 模式是NIO //如果是Linux操作系统可以在spark-defaults.conf中设置spark.rpc.io.mode = EPOLL 来实现IOMode ioMode = IOMode.valueOf(conf.ioMode());//基于Netty中Reactor模型的启动EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, 1,conf.getModuleName() + "-boss");EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),conf.getModuleName() + "-server");bootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NettyUtils.getServerChannelClass(ioMode)).option(ChannelOption.ALLOCATOR, pooledAllocator).option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS).childOption(ChannelOption.ALLOCATOR, pooledAllocator);//绑定端口启动一个Netty服务端channelFuture = bootstrap.bind(address);}}
5、TransportClient
//客户端,用于获取预先协商的流的连续块。此API旨在实现大量数据的高效传输,这些数据被分解为大小从数百KB到几MB不等的块。
//请注意,虽然此客户端处理从流(即数据平面)中提取块,但流的实际设置是在传输层范围之外完成的。提供方便的方法“sendRPC”来实现客户端和服务器之间的控制平面通信,以执行此设置。
//例如:典型的工作流程是这样的:
//client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
//client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
//client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
//......
//client.sendRPC(new CloseStream(100))//使用TransportClientFactory构造TransportClient的实例。单个TransportClient可用于多个流,但任何给定的流都必须限制在单个客户端,以避免乱序响应。
//注意:此类用于向服务器发出请求,而 TransportResponseHandler 负责处理来自服务器的响应。
//并发:线程安全,可以从多个线程调用。
public class TransportClient implements Closeable {//构造的时候给一个 Channel 就可以和Server端进行通信了//给一个TransportResponseHandler 就可以处理来自服务器的响应了public TransportClient(Channel channel, TransportResponseHandler handler) {this.channel = Preconditions.checkNotNull(channel);this.handler = Preconditions.checkNotNull(handler);this.timedOut = false;}public SocketAddress getSocketAddress() {return channel.remoteAddress();}//从远程端请求一个块,来自预先协商的streamId。//块指数从0开始。多次请求同一块是有效的,尽管某些流可能不支持此操作。//多个fetchChunk请求可能同时未完成,并且假设只使用单个TransportClient来获取块,则块保证会按照请求的顺序返回。public void fetchChunk(...){}//请求从远程端以给定的流ID流式传输数据。public void stream(String streamId, StreamCallback callback) {}//向服务器端的RpcHandler发送不透明消息。回调将随服务器的响应或在任何故障时调用。public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {}......}
6、Dispatcher
//消息调度器,负责将RPC消息路由到适当的端点。
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {//端点和消息的映射private val endpoints: ConcurrentMap[String, MessageLoop] =new ConcurrentHashMap[String, MessageLoop]//端点和引用的映射private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)//使用名字注册RpcEndpoint 放回对应的引用 //就是在endpoints加一条记录 (name -> RpcEndpoint)def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {......}//获取对应的引用def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)//向所有已注册的[[RpcEndpoint]]发送消息。这可用于使所有端点都知道网络事件(例如“新增了节点”)。def postToAll(message: InboxMessage): Unit = {......}//向远程端点发送消息def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {......}//向本地端点发送消息def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {......}//发送一个单向消息def postOneWayMessage(message: RequestMessage): Unit = {}//向特定端点发送消息private def postMessage(){}}
7、MessageLoop
//Dispatcher 用于向端点传递消息的消息循环。
private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Logging {//待消息循环处理的带有待处理消息的收件箱列表。private val active = new LinkedBlockingQueue[Inbox]()//消息循环任务;应该在消息循环池的所有线程中运行。protected val receiveLoopRunnable = new Runnable() {override def run(): Unit = receiveLoop()}//不停的处理inbox中的信息,private def receiveLoop(): Unit = {while (true) {val inbox = active.take()if (inbox == MessageLoop.PoisonPill) {// Put PoisonPill back so that other threads can see it.setActive(MessageLoop.PoisonPill)return}//处理inbox存储的消息inbox.process(dispatcher)}}}//使用共享线程池为多个RPC端点提供服务的消息循环。
private class DedicatedMessageLoop(...){}
8、Inbox
//一个收件箱,用于存储 RpcEndpoint 的消息,并安全地向其线程发布消息。
private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)extends Logging {inbox => 给它一个别名,这样我们就可以在闭包中更清楚地使用它。protected val messages = new java.util.LinkedList[InboxMessage]()//OnStart是要处理的第一条消息inbox.synchronized {messages.add(OnStart)}//处理Inbox中的消息def process(dispatcher: Dispatcher): Unit = {message = messages.poll()while (true) {safelyCall(endpoint) {message match {case RpcMessage(_sender, content, context) =>endpoint.receiveAndReplycase OneWayMessage(_sender, content) =>endpoint.receive.applyOrElse[Any, Unit](content, { msg =>case OnStart =>endpoint.onStart() //调用端点的Onstart() Master 、 Worker 的Onstart() 就是这样起来的case OnStop =>dispatcher.removeRpcEndpointRef(endpoint)endpoint.onStop()case RemoteProcessConnected(remoteAddress) =>endpoint.onConnected(remoteAddress)case RemoteProcessDisconnected(remoteAddress) =>endpoint.onDisconnected(remoteAddress)case RemoteProcessConnectionError(cause, remoteAddress) =>endpoint.onNetworkError(cause, remoteAddress)}}}}}
9、OutBox
private[netty] class Outbox(nettyEnv: NettyRpcEnv, val address: RpcAddress) {outbox => //给它一个别名,这样我们就可以在闭包中更清楚地使用它。private val messages = new java.util.LinkedList[OutboxMessage]//connectFuture指向连接任务。如果没有连接任务,connectFuture将为空private var connectFuture: java.util.concurrent.Future[Unit] = null//发送消息。如果没有活动连接,请缓存它并启动新连接def send(message: OutboxMessage): Unit = {}//清空消息队列private def drainOutbox(): Unit = {}//异步启动要给连接任务private def launchConnectTask(): Unit = {}}