使用Akka的Actor模拟Spark的Master和Worker工作机制
Spark的Master和Worker协调工作原理
在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:
- Worker 启动后,会向预先配置的 Master 节点发送注册请求。
- Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
- Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
- Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
- Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
- Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
- 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。
具体原理如下:
- Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
- Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
- 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
- Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。
通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。
使用Akka的Actor模拟Spark的Master和Worker工作机制
- worker注册到Master, Master完成注册,并回复worker注册成功。
- worker定时发送心跳,并在Master接收到。
- Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
- 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
- master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
- 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。
代码实现:
class SparkMaster extends Actor {//定义个hashMap,管理workers(所有worker的实例)val workers = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => {println("master服务器启动了...")//这里开始。。self ! StartTimeOutWorker}case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo 对象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += ((id, workerInfo))println("服务器的workers=" + workers)//回复一个消息,说注册成功sender() ! RegisteredWorkerInfo}}case HeartBeat(id) => {//更新对应的worker的心跳时间//1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间val workerInfo = workers(id)workerInfo.lastHeartBeat = System.currentTimeMillis()println("master更新了 " + id + " 心跳时间...")}case StartTimeOutWorker => {println("开始了定时检测worker心跳的任务")import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 9000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. RemoveTimeOutWorker 发送的内容context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)}//对RemoveTimeOutWorker消息处理//这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除case RemoveTimeOutWorker => {//首先将所有的 workers 的 所有WorkerInfoval workerInfos = workers.valuesval nowTime = System.currentTimeMillis()//先把超时的所有workerInfo,删除即可workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000).foreach(workerInfo => workers.remove(workerInfo.id))println("当前有 " + workers.size + " 个worker存活的")}}
}object SparkMaster {def main(args: Array[String]): Unit = {//这里我们分析出有3个host,port,sparkMasterActorif (args.length != 3) {println("请输入参数 host port sparkMasterActor名字")sys.exit()}val host = args(0)val port = args(1)val name = args(2)//先创建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${host}|akka.remote.netty.tcp.port=${port}""".stripMargin)val sparkMasterSystem = ActorSystem("SparkMaster", config)//创建SparkMaster -actorval sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")//启动SparkMastersparkMasterRef ! "start"}
}
- 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{//masterProxy是Master的代理/引用refvar masterPorxy :ActorSelection = _val id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()调用")//初始化masterPorxymasterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")println("masterProxy=" + masterPorxy)}override def receive:Receive = {case "start" => {println("worker启动了")//发出一个注册消息masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid= " + id + " 注册成功~")//当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 3000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)}case SendHeartBeat =>{println("worker = " + id + "给master发送心跳")masterPorxy ! HeartBeat(id)}}
}object SparkWorker {def main(args: Array[String]): Unit = {if (args.length != 6) {println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")sys.exit()}val workerHost = args(0)val workerPort = args(1)val workerName = args(2)val masterHost = args(3)val masterPort = args(4)val masterName = args(5)val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${workerHost}|akka.remote.netty.tcp.port=${workerPort}""".stripMargin)//创建ActorSystemval sparkWorkerSystem = ActorSystem("SparkWorker",config)//创建SparkWorker 的引用/代理val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")//启动actorsparkWorkerRef ! "start"}
}
- 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。
代码如下:
// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {var lastHeartBeat : Long = System.currentTimeMillis()
}// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker
运行效果:
通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。
Spark的wordCount图解
- 通过图解可以理解RDD对数据和计算逻辑进行封装,RDD的链式操作,数据流转过程。
RDD理解
- RDD的数据处理方式类似于IO流,也有装饰者设计模式。
- RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作。之前的封装全部都是功能的扩展。
- RDD是不保存数据的,但是IO可以临时保存一部分数据。
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据
处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行
计算的集合。
➢ 弹性
1)存储的弹性:内存与磁盘的自动切换;
2)错的弹性:数据丢失可以自动恢复;
3)计算的弹性:计算出错重试机制;
4)分片的弹性:可根据需要重新分片。(分片可以理解为分区)
➢ 分布式:数据存储在大数据集群不同节点上
➢ 数据集:RDD 封装了计算逻辑,并不保存数据
➢ 数据抽象:RDD 是一个抽象类,需要子类具体实现
➢ 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑。
➢ 可分区、并行计算
假设基于Yarn-Client模式,Driver运行在client客户端主机,这里图解Driver与每个计算节点Executor的协作工作的基本原理:这里假设只有一个RDD。
总结:
RDD是一种封装了计算逻辑的弹性数据集,它是基于内存的多分区并行计算的抽象数据模型。
Spark On Yarn-Client 模式
用于监控和调度的 Driver 模块会在客户端本地主机启动,在客户端执行,而不是在 Yarn 中,所以一般用于测试。
➢ Driver 在任务提交的本地机器上运行
➢ Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster
➢ ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存。
➢ ResourceManager 接到 ApplicationMaster 的资源申请后会分配container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程。
➢ Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main 函数。
➢ 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,然后的个数是有每个阶段的最后一个RDD的分区数决定的;之后将 task 分发到各个 Executor 上执行,最终将执行结果输出;如何需要将每个Executor的结果返回到Driver聚合,就需要使用累加器完成了。