Spark原理

主要包括:

  • 核心组件的运行机制(Master,Worker,SparkContext等)
  • 任务调度的原理
  • Shuffile的原理
  • 内存管理
  • 数据倾斜处理
  • Spark优化

核心组件的运行机制

Spark 执行任务的原理:

Spark on Yarn:

Cluster模型:

Client模型:

Master Worker通信原理

RpcEnv是RPC的环境对象,管理着整个 RpcEndpoint 的生命周期,其主要功能有:根据name或uri注册endpoints、管理各种消息的处理、停止endpoints。其中RpcEnv只能通过RpcEnvFactory创建得到。 RpcEnv中的核心方法:

// RpcEndpoint 向 RpcEnv 注册 
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
// 根据参数信息,从 RpcEnv 中获得一个远程的RpcEndpoint 
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef

RpcEndpoint:是一个特质,表示一个消息通信体,可以接收、发送、处理消息

生命周期为:construct(构建)->onStart(运行)->receive*(接收消息)->onStop(停止)

Spark的MasterWorker都实现了RpcEndpoint这个特征

RpcEndPointRef:RpcEndPointRef是对远程RpcEndpoint的一个引用,当需要向一个具体的RpcEndpoint发送消息时,需要获取到该RpcEndpoint的引用,然后通过该引用发送消息。

备注*:在Spark源码中,看到private[spark],意思是在spark的包是私有的,除了在spark之外包不能使用

SparkEnv

SparkEnv是Spark计算层的基石,不管是Driver还是Executor,都需要依赖SparkEnv来进行计算,它是Spark的运行环境对象。

看下SparkEnv源码中的构造方法:

class SparkEnv (val executorId: String,---------------------->Executor 的idprivate[spark] val rpcEnv: RpcEnv, ---------------------->通信组件,使SparkEnv具备通信能力val serializer: Serializer,val closureSerializer: Serializer,val serializerManager: SerializerManager,---------------------->序列化管理器val mapOutputTracker: MapOutputTracker,---------------------->map阶段输出追踪器val shuffleManager: ShuffleManager,---------------------->Shuffle管理器val broadcastManager: BroadcastManager,---------------------->广播管理器val blockManager: BlockManager,---------------------->块管理器val securityManager: SecurityManager,---------------------->安全管理器val metricsSystem: MetricsSystem,---------------------->度量系统val memoryManager: MemoryManager,---------------------->内存管理器val outputCommitCoordinator: OutputCommitCoordinator,---------------------->输出提交协调器val conf: SparkConf

从SparkEnv的成员变量可以验证,SparkEnv包含了Spark运行的很多重要组件

SparkEnv的单例对象:

SparkEnv单例对象在JVM是单例的,集群情况下Driver和Executor独自的jvm进程,它们都有各自的SparkEnv单例对象

SparkContext

SparkContext使Spark功能的主要入口点,主要代标与spark的连接,能够用来在集群上创建RDD、累加器、广播变量。每个JVM里之恶能存在一个处于激活状态的SparkContext,在创建新的SparkContext之前必须调用stop()来关闭之前的SparkContext。

源码里SparkContext的一些成员变量

  //spark的配置,本质上是一个并行map集合private var _conf: SparkConf = _private var _eventLogDir: Option[URI] = Noneprivate var _eventLogCodec: Option[String] = Noneprivate var _listenerBus: LiveListenerBus = _//构建整个环境,从上面分析的SparkEnv源码可知,SparkContext具备通信,广播,度量等能力private var _env: SparkEnv = _//状态追踪private var _statusTracker: SparkStatusTracker = _//进度条private var _progressBar: Option[ConsoleProgressBar] = Noneprivate var _ui: Option[SparkUI] = Noneprivate var _hadoopConfiguration: Configuration = _private var _executorMemory: Int = _private var _schedulerBackend: SchedulerBackend = _private var _taskScheduler: TaskScheduler = _//通信private var _heartbeatReceiver: RpcEndpointRef = _@volatile private var _dagScheduler: DAGScheduler = _//应用idprivate var _applicationId: String = _private var _applicationAttemptId: Option[String] = Noneprivate var _eventLogger: Option[EventLoggingListener] = None//动态资源分配private var _executorAllocationManager: Option[ExecutorAllocationManager] = None//资源的清理private var _cleaner: Option[ContextCleaner] = Noneprivate var _listenerBusStarted: Boolean = falseprivate var _jars: Seq[String] = _private var _files: Seq[String] = _private var _shutdownHookRef: AnyRef = _private var _statusStore: AppStatusStore = _

SparkContext初始化

调度的三大组件启动流程

  • DAGScheduler(高层调度器,class):负责将DAG拆分成不同Stage(TaskSet),然后提交给TaskScheduler进行具体处理
  • TaskScheduler(底层调度器,trait,只有一种实现TaskSchedulerImpl):负责实际每个具体Task的物理调度执行
  • SchedulerBackend(trait):有多种实现,分别对应不同的资源管理器
    • Standalone模式下,其实现为:StandaloneSchedulerBackend
    • 对应Yarn
    • 。。。。
启动流程:
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
------>SparkContext.createTaskScheduler#
-----------> case SPARK_REGEX(sparkUrl) =>
-----------> val scheduler = new TaskSchedulerImpl(sc)//创建scheduler
-----------> val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
----------->scheduler.initialize(backend)
----------------->TaskSchedulerImpl#initialize //初始化
-----------------------> this.backend = backend//赋值SchedulerBackend
-----------------------> schedulableBuilder={match}//选择调度策略
----------->(backend, scheduler)
------>_schedulerBackend = sched//给_schedulerBackend赋值
------>_taskScheduler = ts//给_taskScheduler赋值
------>_dagScheduler = new DAGScheduler(this)//_dagScheduler
------>_taskScheduler.start()//启动_taskScheduler
----------->TaskSchedulerImpl#start()
----------------->backend.start()//启动backend
----------------------->StandaloneSchedulerBackend#start//启动
---------------------------->super.start()
----------------------------------->CoarseGrainedSchedulerBackend#start
--------------------------------------->driverEndpoint = createDriverEndpointRef(properties)//创建Driver的Endpoint
---------------------------------------------->DriverEndpoint#onStart//自动执行onStart
---------------------------------------------------->reviveThread.scheduleAtFixedRate//间隔1s执行
---------------------------------------------------------->  Option(self).foreach(_.send(ReviveOffers))//自己给自己发消息
---------------------------------------------->case ReviveOffers =>makeOffers()//接收搭配ReviveOffers这个消息触发
---------------------------------------------->makeOffers()//调度的是底层的资源 只有资源没有任务  offer-》Taskschedule(负责将底层的资源和任务结合起来)
-------------------------------------------------->scheduler.resourceOffers(workOffers)//将集群的资源以Offer的方式发给上层的TaskSchedulerImpl。
-----------------> client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)//创建 StandaloneAppClient
----------------->client.start()
----------------->ClientEndpoint#oNstart=>registerWithMaster(1)//应用程序向 Master 注册

根据以上的方法执行栈可以得出:SparkContext初始化的过程中完成了TaskScheduler,SchedulerBackend,DAGScheduler三个组件的初始化,在初始化的过程中会向master发送注册消息,Driver会周期性的给自己发送消息,调度底层的资源,将集群中的资源以offer的形式发给TaskSchedulerImpl,TaskSchedulerImpl拿到DAGScheduler分配的TaskSet,给task分配资源

作业执行流程(追源码绘制):

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/459040.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构-邻项消除】力扣1003. 检查替换后的词是否有效

给你一个字符串 s ,请你判断它是否 有效 。 字符串 s 有效 需要满足:假设开始有一个空字符串 t “” ,你可以执行 任意次 下述操作将 t 转换为 s : 将字符串 “abc” 插入到 t 中的任意位置。形式上,t 变为 tleft “…

GPT-4o 和 GPT-4 Turbo 模型之间的对比

GPT-4o 和 GPT-4 Turbo 之间的对比 备注 要弄 AI ,不同模型之间的对比就比较重要。 GPT-4o 是 GPT-4 Turbo 的升级版本,能够提供比 GPT-4 Turbo 更多的内容和信息,但成功相对来说更高一些。 第三方引用 在 2024 年 5 月 13 日&#xff0…

115页PPT华为管理变革:制度创新与文化塑造的核心实践

集成供应链(ISC)体系 集成供应链(ISC)体系是英文Integrated Supply Chain的缩写,是一种先进的管理思想,它指的是由相互间提供原材料、零部件、产品和服务的供应商、合作商、制造商、分销商、零售商、顾客等…

TCP simultaneous open测试

源代码 /*************************************************************************> File Name: common.h> Author: hsz> Brief:> Created Time: 2024年10月23日 星期三 09时47分51秒**********************************************************************…

ctfshow(175->178)--SQL注入--联合注入及其过滤

Web175 进入界面: 审计: 查询语句: $sql "select username,password from ctfshow_user5 where username !flag and id ".$_GET[id]." limit 1;";返回逻辑: if(!preg_match(/[\x00-\x7f]/i, json_enc…

可编辑PPT | 柔性制造企业数字化转型与智能工厂建设方案

这份PPT介绍了柔性制造企业在数字化转型和智能工厂建设方面的综合方案。探讨了数据采集、数字孪生、无码开发支撑、数据资产和应用能力层的构建,以及企业信息化的新思路。最终目标是通过这些技术和策略,实现供应链协同、产品全生命周期管理、绿色节能生产…

VUE, element-plus, table分页表格列增加下拉筛选多选框,请求后台

简介 为了方便表格查询时可以筛选列的值,需要给列增加筛选框(多选框),element-plus提供了列的filter字段,但是基于表格数据的筛选,不会重新请求后台,而且当前表格数据有多少个条目,…

WPF+MVVM案例实战(一)- 设备状态LED灯变化实现

文章目录 1、项目创建2、UI界面布局1. MainWindow.xaml2、颜色转换器实现2.MainViewModel.cs 代码实现 3、运行效果4.源代码下载 1、项目创建 打开 VS2022 ,新建项目 Wpf_Examples,创建各层级文件夹,安装 CommunityToolkit.Mvvm 和 Microsof…

python实现投影仪自动对焦

这是一款投影仪,它带有对焦摄像头 它是如何自动对焦的呢? 我们先看一下对焦算法展示效果 说明:左侧是原视频,右侧是对调焦后的视频帧展示,如果下一帧视频比当前帧清晰就会显示下一帧,否则,还是显示当前帧,直至找到更清晰的帧 原理说明: 在投影仪上对焦摄像头就会实…

HelloCTF [RCE-labs] Level 4 - SHELL 运算符

开启靶场,打开链接: 源码很简单,system("ping -c 1 $ip"); GET传参ip 构造payload: /?ip127.0.0.1;ls / /?ip127.0.0.1;cat /flag 成功得到flag: NSSCTF{04ad1d48-4530-481d-aa5d-8a153b0ebf2c}

常见学习陷阱及解决方案

文章目录 1. 拖延2. 信息过载3. 缺乏计划4. 过度依赖记忆5. 缺乏反馈6. 学习环境不佳7. 不够自信8. 不适合的学习方法结论 在学习过程中,学生常常会遇到各种陷阱,这些陷阱可能会影响学习效果和动机。以下是一些常见的学习陷阱及其解决方案: 1…

软硬链接_动静态库

软硬链接 软链接创建 硬链接创建 软链接是独立文件(独立inode号) 硬链接不是独立文件(inode和目标相同) 如何理解软硬链接 软链接有独立inode,软链接内容上,保存的是文件路径 硬链接不是独立文件&#xf…

服务器虚拟化全面教程:从入门到实践

服务器虚拟化全面教程:从入门到实践 引言 在现代 IT 基础设施中,服务器虚拟化已成为一种不可或缺的技术。它不仅能够提高资源利用率,还能降低硬件成本,优化管理流程。本文将深入探讨服务器虚拟化的概念、技术、应用场景及其实现…

初始JavaEE篇——多线程(6):线程池

找往期文章包括但不限于本期文章中不懂的知识点: 个人主页:我要学编程程(ಥ_ಥ)-CSDN博客 所属专栏:JavaEE 到现在为止,我们已经学习了两个经典的多线程案例了:饿汉模式与懒汉模式、阻塞队列与生产者—消费者模型。想要…

static、 静态导入、成员变量的初始化、单例模式、final 常量(Content)、嵌套类、局部类、抽象类、接口、Lambda、方法引用

static static 常用来修饰类的成员:成员变量、方法、嵌套类 成员变量 被static修饰:类变量、成员变量、静态字段 在程序中只占用一段固定的内存(存储在方法区),所有对象共享可以通过实例、类访问 (一般用类名访问和修…

CI/CD 的原理

一、CI/CD 的概念 CI/CD是一种软件开发流程,旨在通过自动化和持续的集成、测试和交付实现高质量的软件产品。 CI(Continuous Integration)持续集成 目前主流的开发方式是协同开发,即多位开发人员同事处理同意应用不同模块或功能。 如果企业在同一时间将…

高效数据集成案例:从聚水潭·奇门到MySQL

聚水潭奇门数据集成到MySQL的技术案例分享 在企业信息化建设中,数据集成是实现业务流程自动化和数据统一管理的关键环节。本文将分享一个具体的系统对接集成案例:如何将聚水潭奇门平台上的销售出库单数据高效、可靠地集成到MySQL数据库中,以…

编译,链接,加载

编译、链接、加载 编译、链接、加载是基础,十几年前从《深入理解计算机系统》等相关书籍中获得了比较全面的理解,现在已经变得有些模糊了。当时没有做总结的习惯,现在零零散散的记一些吧,有时间还要重温书本。 Build time 编译器…

Python(pandas库3)

函数 随机抽样 语法: n:要抽取的行数 frac:抽取的比例,比如 frac0.5,代表抽取总体数据的50% axis:示在哪个方向上抽取数据(axis1 表示列/axis0 表示行) 案例: 输出结果都为随机抽取。 空…

YOLOv8实战野生动物识别

本文采用YOLOv8作为核心算法框架,结合PyQt5构建用户界面,使用Python3进行开发。YOLOv8以其高效的实时检测能力,在多个目标检测任务中展现出卓越性能。本研究针对野生动物数据集进行训练和优化,该数据集包含丰富的野生动物图像样本…