Spark RPC 学习总结

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

本文从API层面学习总结Spark RPC,暂不涉及源码分析。

Spark 通信历史

最开始: Akka
Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题
Spark 1.6:支持配置使用 Akka 或者 Netty。
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

废弃Akka的原因

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-5293
主要原因是解决用户的Spark Application中akka版本和Spark内置的akka版本冲突的问题。比如,用户开发的Spark Application中用到了Spray框架,Spray依赖的akka版本跟Spark的不一致就会导致冲突:

  1. 很多Spark用户也使用Akka,但是由于Akka不同版本之间无法互相通信,这就要求用户必须使用跟Spark完全一样的Akka版本,导致用户无法升级Akka。
  2. Spark的Akka配置是针对Spark自身来调优的,可能跟用户自己代码中的Akka配置冲突。
  3. Spark用的Akka特性很少,这部分特性很容易自己实现。同时,这部分代码量相比Akka来说少很多,debug比较容易。如果遇到什么bug,也可以自己马上fix,不需要等Akka上游发布新版本。而且,Spark升级Akka本身又因为第一点会强制要求用户升级他们使用的Akka,对于某些用户来说是不现实的。

参考:https://www.zhihu.com/question/61638635

RpcEnv

Rpc环境,RpcEndpoint 需要在 RpcEnv 中注册一个名称来接收消息。

先看源码中是如何创建 RpcEnv

  val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)def create(name: String,host: String,port: Int,conf: SparkConf,securityManager: SecurityManager,clientMode: Boolean = false): RpcEnv = {create(name, host, host, port, conf, securityManager, 0, clientMode)}def create(name: String,bindAddress: String,advertiseAddress: String,port: Int,conf: SparkConf,securityManager: SecurityManager,numUsableCores: Int,clientMode: Boolean): RpcEnv = {val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,numUsableCores, clientMode)new NettyRpcEnvFactory().create(config)}

本次测试用的代码

  def createRpcEnv(conf: SparkConf,name: String,port: Int,clientMode: Boolean = false): RpcEnv = {val config = RpcEnvConfig(conf, name, "localhost", "localhost", port,new SecurityManager(conf), 0, clientMode)new NettyRpcEnvFactory().create(config)}

clientMode: 是否客户端模式,默认false,默认会启动一个 NettyServer,具体在 TransportServer.init 中实现,可参考上篇文章。如果设置为true,则不启动服务,只作为一个客户端。
port: 为0时,会随机绑定一个端口号,这一点是Netty本身实现的,如果非0,则按照指定的端口绑定,但是要求端口号范围为[1024,65536),如果端口已占用,则尝试端口号+1,默认重试16次,可以通过配置 spark.port.maxRetries 修改最大重试次数 。

RpcEndpoint

很多都是RpcEndpoint的子类,如:MasterWorkerClientEndpointDriverEndpointCoarseGrainedExecutorBackendYarnCoarseGrainedExecutorBackendYarnDriverEndpointYarnSchedulerEndpoint 等。

RpcEndpoint 的生命周期:constructor -> onStart -> receive* -> onStop 。也就是首先会调用 onStart 方法。

RpcEndpoint 首先必须通过调用 rpcEnv.setupEndpoint 才能使用
setupEndpoint 使用名称注册 RpcEndpoint 并返回其 RpcEndpointRef

def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

RpcEndpointRef

RpcEndpointRef:远程 RpcEndpoint 的引用。RpcEndpointRef 是线程安全的

有两种方法可以返回RpcEndpointRef 一个是上面提到的setupEndpoint,另外一个则是 setupEndpointRef

  /*** Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.*/def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]/*** Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.*/def setupEndpointRefByURI(uri: String): RpcEndpointRef = {defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))}/*** Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName`.* This is a blocking action.*/def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)}

setupEndpoint 返回的是本地 RpcEndpoint 的引用,主要作用是使用名称注册
setupEndpointRef 根据远程地址和名称返回 RpcEndpoint 的引用。例如:

// Worer 中返回 Master 的引用
val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
// CoarseGrainedExecutorBackend 中获取 Driver 的引用
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl)

方法调用

  • rpcEnv.setupEndpoint : 调用 rpcEndpoint.onStart
  • rpcEndpointRef.send(没有返回值) : 调用 rpcEndpoint.receive
  • rpcEndpointRef.ask*(有返回值): 调用 rpcEndpoint.receiveAndReply (rpcEndpointRef.ask* 最终都是在 NettyRpcEnv.askAbortable中实现)
  • rpcEnv.stop(rpcEndpointRef) : 调用 rpcEndpoint.onStop

测试代码

完整代码:https://gitee.com/dongkelun/java-learning/tree/master/scala-learning/src/main/scala/org/apache/spark/rpc

本地测试

package org.apache.spark.rpcimport org.apache.spark.{SparkConf, SparkEnv}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}import scala.concurrent.duration._object RpcLocalTest extends RpcParent {def main(args: Array[String]): Unit = {val conf = new SparkConf()val env = createRpcEnv(conf, "local", 8000)@volatile var message: String = nullval rpcEndpointRef = env.setupEndpoint("send-locally", new RpcEndpoint {override val rpcEnv = envoverride def onStart(): Unit = {println("start hello endpoint")}override def receive = {case msg: String =>println(msg)message = msg}})rpcEndpointRef.send("hello")eventually(timeout(5.seconds), interval(10.milliseconds)) {assert("hello" == message)}if (env != null) {env.shutdown()}SparkEnv.set(null)}
}

远程测试

RpcRemoteServer

package org.apache.spark.rpcimport org.apache.spark.SparkConfimport java.util.concurrent.CountDownLatchobject RpcRemoteServer extends RpcParent {def main(args: Array[String]): Unit = {val shutdownLatch = new CountDownLatch(1)val env = createRpcEnv(new SparkConf(), "local", 8000)println(s"地址:${env.address}")env.setupEndpoint("ask-remotely", new RpcEndpoint {override val rpcEnv = envoverride def onStart(): Unit = {println("onStart 被调用")}override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {case msg: String =>context.reply(msg)}})shutdownLatch.await()}
}

RpcRemoteTest

package org.apache.spark.rpcimport org.apache.spark.SparkConfimport java.util.concurrent.CountDownLatchobject RpcRemoteTest extends RpcParent {def main(args: Array[String]): Unit = {val shutdownLatch = new CountDownLatch(1)val anotherEnv = createRpcEnv(new SparkConf(), "remote-client", 0, clientMode = true)println(s"地址:${anotherEnv.address}")val rpcEndpointRef = anotherEnv.setupEndpointRef(new RpcAddress("localhost", 8000), "ask-remotely")val reply = rpcEndpointRef.askSync[String]("hello Remote")println(reply)shutdownLatch.await()}
}

这里需要注意 RpcRemoteServerRpcEndpoint 的名称为 ask-remotely ,我们在 RpcRemoteTest 中不仅需要对应的IP、端口号,而且名称也一定要对应准确。


更多测试

完整代码:https://gitee.com/dongkelun/java-learning/blob/master/scala-learning/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

如测试 onStart 和 onStop

  test("onStart and onStop") {val stopLatch = new CountDownLatch(1)val calledMethods = mutable.ArrayBuffer[String]()val endpoint = new RpcEndpoint {override val rpcEnv = envoverride def onStart(): Unit = {println("onStart 被调用")calledMethods += "start"}override def receive: PartialFunction[Any, Unit] = {case msg: String =>}override def onStop(): Unit = {println("onStop 被调用")calledMethods += "stop"
//        stopLatch.countDown()}}println("调用setupEndpoint前")val rpcEndpointRef = env.setupEndpoint("start-stop-test", endpoint)println("调用setupEndpoint后")println("调用stop前")env.stop(rpcEndpointRef)println("调用stop后")stopLatch.await(10, TimeUnit.SECONDS)assert(List("start", "stop") === calledMethods)}

总结

  • 首先用 RpcEnv.create 创建 RpcEnv,这里底层会通过 Netty 创建一个 Server, 绑定对应的端口,这里也可以只使用客户端模式不创建 Server
  • 然后具体通信的实体类是在 RpcEndpoint 中实现,比如 MasterWorker 等都是 RpcEndpoint, RpcEndpoint 首先必须通过调用 rpcEnv.setupEndpoint 才能使用。
  • RpcEndpoint 的方法调用都是通过它对应引用 RpcEndpointRef 实现, rpcEnv.setupEndpoint 会返回本地引用,setupEndpointRef 根据远程地址和名称返回远程 RpcEndpoint 的引用,注意这里名称一定要对应准确。
  • RpcEndpoint 的方法调用顺序 onStart -> receive* -> onStop ,其中 onStart 做一些初始化的准备,setupEndpoint 会触发 onStart 方法;receive 方法没有返回值,receiveAndReply 方法有返回值,分别通过 rpcEndpointRef.sendrpcEndpointRef.ask* 触发,ask方法分同步调用和异步调用;而 onStop 则处理服务停止后的操作,通过 rpcEnv.stop 触发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/4365.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu20.04取消root账号自动登录的方法,触觉智能RK3568开发板演示

Ubuntu20.04默认情况下为root账号自动登录,本文介绍如何取消root账号自动登录,改为通过输入账号密码登录,使用触觉智能EVB3568鸿蒙开发板演示,搭载瑞芯微RK3568,四核A55处理器,主频2.0Ghz,1T算力…

C/C++内存管理(超详解)

目录 1.C/C内存分布 2.C语言动态内存管理 2.1 malloc 2.2 free 2.3 calloc 2.4 realloc 3.C动态内存管理 3.1new/delete操作内置类型 3.2new/delete操作自定义类型 3.3operator new与operator delete函数 3.4定位new表达式(placement-new) 1.C/C内存分布 内存中是如…

[Python学习日记-78] 基于 TCP 的 socket 开发项目 —— 模拟 SSH 远程执行命令

[Python学习日记-78] 基于 TCP 的 socket 开发项目 —— 模拟 SSH 远程执行命令 简介 项目分析 如何执行系统命令并拿到结果 代码实现 简介 在Python学习日记-77中我们介绍了 socket 基于 TCP 和基于 UDP 的套接字,还实现了服务器端和客户端的通信,本…

AIGC视频生成模型:Meta的Emu Video模型

大家好,这里是好评笔记,公主号:Goodnote,专栏文章私信限时Free。本文详细介绍Meta的视频生成模型Emu Video,作为Meta发布的第二款视频生成模型,在视频生成领域发挥关键作用。 🌺优质专栏回顾&am…

IO进程----进程

进程 什么是进程 进程和程序的区别 概念: 程序:编译好的可执行文件 存放在磁盘上的指令和数据的有序集合(文件) 程序是静态的,没有任何执行的概念 进程:一个独立的可调度的任务 执行一个程序分配资…

flutter 使用google_mlkit_image_labeling做图片识别

在AI横行的如今,相信大家或多或少都做过跟AI接轨的需求了吧?今天我说的是关于图片识别的需求,flutter的专属图片识别插件google_mlkit_image_labeling。 google_mlkit_image_labeling它是Google旗下的Google Cloud Vision API中分支出来的一部…

elasticsearch基础

分布式搜索引擎01 1. 初始elasticsearch 1.1. 了解ES 1.1.1. elasticsearch的作用 elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容 例如: 在github搜索代码: 在电…

SQL Server查询计划操作符——查询计划相关操作符(4)

7.3. 查询计划相关操作符 28)Declare:该操作符在查询计划中分配一个本地变量。该操作符是一个语言元素。该操作符具体如图7.2-28所示。 图 7.2-28 查询计划操作符Declare示例 29)Delete:该操作符从一个对象中删除满足其参数列中可选谓词的数据行。该操作符具体如图7.2-29…

C++的auto_ptr智能指针:从诞生到被弃用的历程

C作为一种功能强大的编程语言,为开发者提供了众多便捷的特性和工具,其中智能指针是其重要特性之一。智能指针能够自动管理内存,有效避免内存泄漏等常见问题。然而,并非所有智能指针都尽善尽美,auto_ptr便是其中的一个例…

[手机Linux] 七,NextCloud优化设置

安装完成后在个人设置里发现很多警告,一一消除。 只能一条一条解决了。 关于您的设置有一些错误。 1,PHP 内存限制低于建议值 512 MB。 设置php配置文件: /usr/local/php/etc/php.ini 把里面的: memory_limit 128M 根据你自…

微软宣布Win11 24H2进入新阶段!设备将自动下载更新

快科技1月19日消息,微软于1月16日更新了支持文档,宣布Windows 11 24H2进入新阶段。 24H2更新于2024年10月1日发布,此前为可选升级,如今微软开始在兼容的Windows 11设备上自动下载并安装24H2版本。 微软表示:“运行Wi…

ddl-auto: create

package com.test.entity;import jakarta.persistence.*; import lombok.*; import org.hibernate.annotations.Comment;import java.time.Instant; import java.util.Objects;Comment("操作日志表") Entity // Entity注解的类将会初始化为一张数据库表 Table(name …

循环队列(C语言)

从今天开始我会开启一个专栏leetcode每日一题,大家互相交流代码经验,也当作我每天练习的自我回顾。第一天的内容是leetcode622.设计循环队列。 一、题目详细 设计你的循环队列实现。 循环队列是一种线性数据结构,其操作表现基于 FIFO&#…

Golang Gin系列-1:Gin 框架总体概述

本文介绍了Gin框架,探索了它的关键特性,并建立了简单入门的应用程序。在这系列教程里,我们会探索Gin的主要特性,如路由、中间件、数据库集成等,最终能使用Gin框架构建健壮的web应用程序。 总体概述 Gin是Go编程语言的…

在线宠物用品|基于vue的在线宠物用品交易网站(源码+数据库+文档)

|在线宠物用品交易网站 目录 基于springbootvue的在线宠物用品交易网站 一、前言 二、系统设计 三、系统功能设计 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取: 博主介绍:✌️大厂码农|毕设布道师&am…

鸿蒙安装HAP时提示“code:9568344 error: install parse profile prop check error” 问题现象

在启动调试或运行应用/服务时,安装HAP出现错误,提示“error: install parse profile prop check error”错误信息。 解决措施 该问题可能是由于应用使用了应用特权,但应用的签名文件发生变化后未将新的签名指纹重新配置到设备的特权管控白名…

图像去雾数据集的下载和预处理操作

前言 目前,因为要做对比实验,收集了一下去雾数据集,并且建立了一个数据集的预处理工程。 这是以前我写的一个小仓库,我决定还是把它用起来,下面将展示下载的路径和数据处理的方法。 下面的代码均可以在此找到。Auo…

React的应用级框架推荐——Next、Modern、Blitz等,快速搭建React项目

在 React 企业级应用开发中,Next.js、Modern.js 和 Blitz 是三个常见的框架,它们提供了不同的特性和功能,旨在简化开发流程并提高应用的性能和扩展性。以下是它们的详解与比较: Next、Modern、Blitz 1. Next.js Next.js 是由 Ve…

内网渗透测试工具及渗透测试安全审计方法总结

1. 内网安全检查/渗透介绍 1.1 攻击思路 有2种思路: 攻击外网服务器,获取外网服务器的权限,接着利用入侵成功的外网服务器作为跳板,攻击内网其他服务器,最后获得敏感数据,并将数据传递到攻击者&#xff0…

Weblogic - General - 弱口令 任意文件读取漏洞

0x01:漏洞简介 首先需要说明,本文并不是介绍了 Weblogic 某一 CVE 漏洞,而是提供了一种通用的测试思路。 0x0101:弱口令漏洞 弱口令漏洞主要是由于用户安全意识淡薄,为了便于记忆,设置了强度过低的密码&…