使用Akka的Actor模拟Spark的Master和Worker工作机制

使用Akka的Actor模拟Spark的Master和Worker工作机制

Spark的Master和Worker协调工作原理

在 Apache Spark 中,Master 和 Worker 之间通过心跳机制进行通信和保持活动状态。下面是 Master 和 Worker 之间心跳机制的工作流程:

  1. Worker 启动后,会向预先配置的 Master 节点发送注册请求。
  2. Master 接收到注册请求后,会为该 Worker 创建一个唯一的标识符(Worker ID)并将其信息保存在内存中。
  3. Master 向 Worker 发送包含 Master URL、Worker ID 等信息的注册响应。
  4. Worker 收到注册响应后,会启动一个定时器并开始周期性地向 Master 发送心跳消息。
  5. Worker 的心跳消息中包含当前的负载状况、可用资源等信息。
  6. Master 接收到心跳消息后,更新该 Worker 的最近心跳时间,并根据需要对集群进行动态调整,例如添加新的任务或删除故障的 Worker。
  7. 如果 Master 在一段时间内没有收到某个 Worker 的心跳消息,它将把该 Worker 标记为失效,并将其相应的资源标记为可用以供后续使用。

具体原理如下:

  • Worker 通过网络向 Master 发送心跳消息,通常使用基于 TCP 的长连接。这些心跳消息可以包含有关 Worker 健康状况、资源利用情况等的信息。
  • Master 使用一个内部的心跳管理组件来处理接收到的心跳消息,并维护每个 Worker 的状态。它根据心跳消息的频率和时间戳来判断 Worker 是否正常运行。
  • 如果 Master 在预定的时间内没有收到 Worker 的心跳消息,它会将该 Worker 标记为失效并触发一系列的故障处理机制,例如重新分配任务给其他可用的 Worker。
  • Worker 定期发送心跳消息,以确保在网络故障、Worker 故障或其他问题发生时能够及时通知 Master。

通过心跳机制,Master 能够实时监控 Worker 的状态,并根据需要进行集群的动态管理和资源调度,从而实现高可用性和容错性。

使用Akka的Actor模拟Spark的Master和Worker工作机制

  1. worker注册到Master, Master完成注册,并回复worker注册成功。
  2. worker定时发送心跳,并在Master接收到。
  3. Master接收到worker心跳后,要更新该worker的最近一次发送心跳的时间。
  4. 给Master启动定时任务,定时检测注册的worker有哪些没有更新心跳,并将其从hashmap中删除。
  5. master worker 进行分布式部署(Linux系统)-》如何给maven项目打包->上传linux。
  • 创建SparkMaster类继承Actor特质,实现Receive方法,并定义对应的伴生对象,在伴生对象中创建SparkMaster-actor引用,并启动Actor发送消息。服务端Master对worker进行心跳监测,发现6秒内无法获取worker心跳,将异常的Worker的实例从HashMap中移除。若能正常获取到心跳,则获取心跳信息后更新心跳时间。定时保持心跳机制。

代码实现:

class SparkMaster extends Actor {//定义个hashMap,管理workers(所有worker的实例)val workers = mutable.Map[String, WorkerInfo]()override def receive: Receive = {case "start" => {println("master服务器启动了...")//这里开始。。self ! StartTimeOutWorker}case RegisterWorkerInfo(id, cpu, ram) => {//接收到worker注册信息if (!workers.contains(id)) {//创建WorkerInfo 对象val workerInfo = new WorkerInfo(id, cpu, ram)//加入到workersworkers += ((id, workerInfo))println("服务器的workers=" + workers)//回复一个消息,说注册成功sender() ! RegisteredWorkerInfo}}case HeartBeat(id) => {//更新对应的worker的心跳时间//1.从workers对应的HashMap中取出WorkerInfo,然后更新worker心跳时间val workerInfo = workers(id)workerInfo.lastHeartBeat = System.currentTimeMillis()println("master更新了 " + id + " 心跳时间...")}case StartTimeOutWorker => {println("开始了定时检测worker心跳的任务")import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 9000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. RemoveTimeOutWorker 发送的内容context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)}//对RemoveTimeOutWorker消息处理//这里需求检测哪些worker心跳超时(now - lastHeartBeat > 6000),并从map中删除case RemoveTimeOutWorker => {//首先将所有的 workers 的 所有WorkerInfoval workerInfos = workers.valuesval nowTime = System.currentTimeMillis()//先把超时的所有workerInfo,删除即可workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeat) > 6000).foreach(workerInfo => workers.remove(workerInfo.id))println("当前有 " + workers.size + " 个worker存活的")}}
}object SparkMaster {def main(args: Array[String]): Unit = {//这里我们分析出有3个host,port,sparkMasterActorif (args.length != 3) {println("请输入参数 host port sparkMasterActor名字")sys.exit()}val host = args(0)val port = args(1)val name = args(2)//先创建ActorSystemval config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${host}|akka.remote.netty.tcp.port=${port}""".stripMargin)val sparkMasterSystem = ActorSystem("SparkMaster", config)//创建SparkMaster -actorval sparkMasterRef = sparkMasterSystem.actorOf(Props[SparkMaster], s"${name}")//启动SparkMastersparkMasterRef ! "start"}
}
  • 定义SparkWorker类继承Actor特质,实现Receive方法,在方法中实现向master发送注册信息的请求,获取到服务端Master注册成功的消息后,定义定时任务发送心跳包给Master。
class SparkWorker(masterHost:String,masterPort:Int,masterName:String) extends Actor{//masterProxy是Master的代理/引用refvar masterPorxy :ActorSelection = _val id = java.util.UUID.randomUUID().toStringoverride def preStart(): Unit = {println("preStart()调用")//初始化masterPorxymasterPorxy = context.actorSelection(s"akka.tcp://SparkMaster@${masterHost}:${masterPort}/user/${masterName}")println("masterProxy=" + masterPorxy)}override def receive:Receive = {case "start" => {println("worker启动了")//发出一个注册消息masterPorxy ! RegisterWorkerInfo(id, 16, 16 * 1024)}case RegisteredWorkerInfo => {println("workerid= " + id + " 注册成功~")//当注册成功后,就定义一个定时器,每隔一定时间,发送SendHeartBeat给自己import context.dispatcher//说明//1. 0 millis 不延时,立即执行定时器//2. 3000 millis 表示每隔3秒执行一次//3. self:表示发给自己//4. SendHeartBeat 发送的内容context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)}case SendHeartBeat =>{println("worker = " + id + "给master发送心跳")masterPorxy ! HeartBeat(id)}}
}object SparkWorker {def main(args: Array[String]): Unit = {if (args.length != 6) {println("请输入参数 workerHost workerPort workerName masterHost masterPort masterName")sys.exit()}val workerHost = args(0)val workerPort = args(1)val workerName = args(2)val masterHost = args(3)val masterPort = args(4)val masterName = args(5)val config = ConfigFactory.parseString(s"""|akka.actor.provider="akka.remote.RemoteActorRefProvider"|akka.remote.netty.tcp.hostname=${workerHost}|akka.remote.netty.tcp.port=${workerPort}""".stripMargin)//创建ActorSystemval sparkWorkerSystem = ActorSystem("SparkWorker",config)//创建SparkWorker 的引用/代理val sparkWorkerRef = sparkWorkerSystem.actorOf(Props(new SparkWorker(masterHost, masterPort.toInt,masterName)), s"${workerName}")//启动actorsparkWorkerRef ! "start"}
}
  • 分别定义发送注册信息的RegisterWorkerInfo的样例类,WorkerInfo消息类,定义注册成功的消息样例对象RegisteredWorkerInfo,心跳信息样例类HeartBeat,以及确认发送心跳信息样例对象SendHeartBeat,触发超时work的样例对象StartTimeOutWorker,移除超时worker的样例对象RemoveTimeOutWorker。

代码如下:

// worker注册信息 //MessageProtocol.scala
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)// 这个是WorkerInfo, 这个信息将来是保存到master的 hm(该hashmap是用于管理worker)
// 将来这个WorkerInfo会扩展(比如增加worker上一次的心跳时间)
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {var lastHeartBeat : Long = System.currentTimeMillis()
}// 当worker注册成功,服务器返回一个RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo//worker每隔一定时间由定时器发给自己的一个消息
case object SendHeartBeat
//worker每隔一定时间由定时器触发,而向master发现的协议消息
case class HeartBeat(id: String)//master给自己发送一个触发检查超时worker的信息
case object StartTimeOutWorker
// master给自己发消息,检测worker,对于心跳超时的.
case object RemoveTimeOutWorker

运行效果:
在这里插入图片描述
通过这个案例我们可以深入理解Spark的Master和Worker的通讯机制,为了方便以后对Spark的底层源码的学习,命名的方式和源码保持一致.(如: 通讯消息类命名就是一样的);同时也加深了我们对主从服务心跳检测机制(HeartBeat)的理解,方便以后spark源码二次开发。

Spark的wordCount图解

  • 通过图解可以理解RDD对数据和计算逻辑进行封装,RDD的链式操作,数据流转过程。

在这里插入图片描述

RDD理解

  • RDD的数据处理方式类似于IO流,也有装饰者设计模式。
  • RDD的数据只有在调用collect方法时,才会真正执行业务逻辑操作。之前的封装全部都是功能的扩展。
  • RDD是不保存数据的,但是IO可以临时保存一部分数据。

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据
处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行
计算的集合。
➢ 弹性
1)存储的弹性:内存与磁盘的自动切换;
2)错的弹性:数据丢失可以自动恢复;
3)计算的弹性:计算出错重试机制;
4)分片的弹性:可根据需要重新分片。(分片可以理解为分区)

➢ 分布式:数据存储在大数据集群不同节点上
➢ 数据集:RDD 封装了计算逻辑,并不保存数据
➢ 数据抽象:RDD 是一个抽象类,需要子类具体实现
➢ 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的 RDD 里面封装计算逻辑。
➢ 可分区、并行计算


假设基于Yarn-Client模式,Driver运行在client客户端主机,这里图解Driver与每个计算节点Executor的协作工作的基本原理:这里假设只有一个RDD。
总结:
RDD是一种封装了计算逻辑的弹性数据集,它是基于内存的多分区并行计算的抽象数据模型。
在这里插入图片描述
Spark On Yarn-Client 模式
用于监控和调度的 Driver 模块会在客户端本地主机启动,在客户端执行,而不是在 Yarn 中,所以一般用于测试。

➢ Driver 在任务提交的本地机器上运行

➢ Driver 启动后会和 ResourceManager 通讯申请启动 ApplicationMaster

➢ ResourceManager 分配 container,在合适的 NodeManager 上启动 ApplicationMaster,负责向 ResourceManager 申请 Executor 内存。

➢ ResourceManager 接到 ApplicationMaster 的资源申请后会分配container,然后ApplicationMaster 在资源分配指定的 NodeManager 上启动 Executor 进程。

➢ Executor 进程启动后会向 Driver 反向注册,Executor 全部注册完成后 Driver 开始执行main 函数。

➢ 之后执行到 Action 算子时,触发一个 Job,并根据宽依赖开始划分 stage,每个 stage 生成对应的 TaskSet,然后的个数是有每个阶段的最后一个RDD的分区数决定的;之后将 task 分发到各个 Executor 上执行,最终将执行结果输出;如何需要将每个Executor的结果返回到Driver聚合,就需要使用累加器完成了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/124991.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 7 第九讲 微服务集成Redis 应用篇

Jedis 理论 Jedis是redis的java版本的客户端实现,使用Jedis提供的Java API对Redis进行操作,是Redis官方推崇的方式;并且,使用Jedis提供的对Redis的支持也最为灵活、全面;不足之处,就是编码复杂度较高。 …

如何选择合适的HTTP代理服务器

HTTP代理服务器是一种常见的网络代理方式,它可以帮助用户隐藏自己的IP地址,保护个人隐私和安全。然而,选择合适的HTTP代理服务器并不容易,需要考虑多个因素。本文将介绍如何选择合适的HTTP代理服务器。 了解代理服务器的类型 HTT…

中使用pack局管理器:管理器布置小部件

一、说明 在本教程中,我们将了解如何制作登录 UI。今天的教程将重点介绍如何使用 Tkinter 的pack布局管理器。 二、设计用户界面 什么是布局管理器?创建图形界面时,窗口中的小部件必须具有相对于彼此排列的方式。例如,可以使用微件…

Vue + Element UI 前端篇(十一):第三方图标库

Vue Element UI 实现权限管理系统 前端篇(十一):第三方图标库 使用第三方图标库 用过Elment的同鞋都知道,Element UI提供的字体图符少之又少,实在是不够用啊,幸好现在有不少丰富的第三方图标库可用&…

Python 网页爬虫原理及代理 IP 使用

目录 前言 一、Python 网页爬虫原理 二、Python 网页爬虫案例 步骤1:分析网页 步骤2:提取数据 步骤3:存储数据 三、使用代理 IP 四、总结 前言 随着互联网的发展,网络上的信息量变得越来越庞大。对于数据分析人员和研究人…

【多思路附源码】2023高教社杯 国赛数学建模C题思路 - 蔬菜类商品的自动定价与补货决策

赛题介绍 在生鲜商超中,一般蔬菜类商品的保鲜期都比较短,且品相随销售时间的增加而变差, 大部分品种如当日未售出,隔日就无法再售。因此, 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬…

【容器vs虚拟机】

容器vs虚拟机 为什么用虚拟机什么是容器容器vs虚拟机 Docker被称为是轻量级的虚拟化。 首先,一般开发所需要的都是Linux环境,但我们大多数人的电脑都是Windows系统。所以要安装虚拟机,目的是为了在我们当前所使用的Windows上面安装上Linux环境…

conda创建python虚拟环境

1.查看当前存在那些虚拟环境 conda env list conda info -e 2.conda安装虚拟环境 conda create -n my_env_name python3.6 2.1在anaconda下改变python版本 当前3.7 安装3.7 conda create -n py37 python3.7 conda activate py37 conda create -n py37 python3.7conda a…

R语言入门——line和lines的区别

目录 0 引言一、 line()二、 lines() 0 引言 首先,从直观上看,lines比line多了一个s,但它们还是有很大的区别的,下面将具体解释这个两个函数的区别。 一、 line() 从R语言的帮助文档中找到,line()的使用&#xff0c…

微服务架构基础--第4章Spring Boot核心功能2

第4章Spring Boot核心功能2 一.预习笔记 1.静态资源访问 1-1:resource下的static文件夹会被视为默认的根目录(默认静态资源文件夹) 1-2:index.html是SpringBoot的默认首页(默认配置了的) 1-3:修改网页logo&#xf…

Golang RSA 生成密钥、加密、解密、签名与验签

文章目录 1.RSA2.Golang 实现 RSA生成密钥加密解密签名验签 3.dablelv/cyan参考文献 1.RSA RSA 是最常用的非对称加密算法,由 Ron Rivest、Adi Shamir、Leonard Adleman 于1977 年在麻省理工学院工作时提出,RSA 是三者姓氏首字母的拼接。 它的基本原理…

微服务01-基本介绍+注册中心EureKa

基本介绍 服务集群:一个请求由多个服务完成,服务接口暴露,以便于相互调用; 注册中心:每个服务的状态,需要进行维护,我们可以在注册中心进行监控维护服务; 配置中心:这些…

失效的访问控制漏洞复现(dvwa)

文章目录 失效访问控制是什么?dvwa漏洞复现用未授权访问获取shell 代码审计 失效访问控制是什么? 由于缺乏自动化的检测和应用程序开发人员缺乏有效 的功能测试,因而访问控制缺陷很常见。导致攻击者可以冒充用户、管理员或拥有特权的用户&…

Android 10.0 禁用adb shell input输入功能

1.前言 在10.0的产品开发中,在进行一些定制开发中,对于一些adb shell功能需要通过属性来控制禁止使用input 等输入功能,比如adb shell input keyevent 响应输入事件等,所以就需要 熟悉adb shell input的输入事件流程,然后来禁用adb shell input的输入事件功能,接下来分…

GPT-人工智能如何改变我们的编码方式

在本文中,您将找到我对人工智能和工作的最新研究的总结(探索人工智能对生产力的影响,同时开启对长期影响的讨论),一个准实验方法的示例(通过 ChatGPT 和 Stack Overflow 进行说明,了解如何使用简…

python in excel 如何尝鲜 有手就行

众所周知,微软在8月下旬放出消息python已入驻excel,可到底怎么实现呢。 今天我就将发布python in excel的保姆级教程,开始吧! 获取office 365 账号 首先我们要有微软office365 这时候需要再万能的某宝去找一个账号,…

已经2023年了,你还不会手撕轮播图?

目录 一、前言二、动画基础1. 定时器2. left与offsetLeft3. 封装函数3.1 物体3.2 目标点3.3 回调函数 4.封装 三、基础结构3.1 焦点图3.2 按钮3.3 小圆点3.4 总结 四、按钮显示五、圆点5.1 生成5.2 属性5.3 移动 六、按钮6.1 准备6.2 出错6.2.1 小圆点跟随6.2.2 图片返回 6.3 b…

云端AI:释放企业创新力,打造智慧企业

文章目录 1. 云端AI的基本概念1.1 云计算1.2 人工智能1.3 云端AI 2. 云端AI的重要性2.1 成本效益2.2 弹性扩展2.3 无缝整合2.4 实时更新 3. 云端AI的应用领域3.1 智能客服3.2 预测分析3.3 自动化生产 4. 云端AI的未来趋势4.1 边缘计算与云端AI的融合4.2 可解释性AI4.3 隐私和安…

zabbix使用 -- 添加监控节点、自定义监控项、触发器

目录 页面中的一些概念配置agent服务来获取目标主机数据对nginx服务器进行监控在网页中添加一台配置 自定义监控项 -- 以监控nginx为例1、开启nginx本身的统计功能2、编写脚本采集数据3、在zabbix-server里获取数据监控ssh进程监控cron进程 触发器报警1、注册一个企业微信2、微…

开启全新教学模式!vLive虚拟直播如何赋能线上教培

 如今,教培领域正在经历一场数字化的变革。随着科技的迅猛发展,教培形式也在不断演变,越来越多的企业和讲师开始采用虚拟直播来进行在线教学。那么,vLive虚拟直播https://live.vsochina.com/cnvLive虚拟直播是如何赋…