spark 动态资源分配dynamicAllocation

动态资源分配,主要是spark在运行中可以相对合理的分配资源。

  • 初始申请的资源远超实际需要,减少executor
  • 初始申请的资源比实际需要少很多,增多executor
  • Spark运行多个job,这些job所需资源有的多有的少,动态调整executor数量

相关参数

spark.dynamicAllocation.enabled:默认false,设置为true则启用动态资源分配,允许 Spark 根据任务需求自动调整执行器的数量。
spark.shuffle.service.enabled:默认为false,禁用独立的 Shuffle 服务。如果使用动态资源分配,需要设置为true,将Shuffle与Executor分开。
spark.dynamicAllocation.initialExecutors:默认0,初始执行器的数量。
spark.dynamicAllocation.minExecutors:默认0,执行器的最小数量。
spark.dynamicAllocation.maxExecutors:默认Int最大值,执行器的最大数量。
spark.dynamicAllocation.executorAllocationRatio:默认1.0,用于执行器分配的比例,表示给每个应用程序分配的资源相对于集群中所有可用资源的比例。
spark.dynamicAllocation.schedulerBacklogTimeout:默认1s,作业调度队列中作业等待的超时时间。
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout:默认1s,作业调度队列中连续等待的时间阈值。
spark.dynamicAllocation.executorIdleTimeout:默认60s,没有缓存的执行器空闲时自动释放的超时时间。
spark.dynamicAllocation.cachedExecutorIdleTimeout:默认Int最大值,有缓存的空闲执行器的超时时间。

ExecutorAllocationManager

ExecutorAllocationManager是在SparkContext初始化的时候创建的,创建后调用它的start方法。

initializing变量标记ExecutorAllocationManager是否可以进行动态调整。

addTime变量是添加新的executor的时间点

start

在start方法首先注册了两个listener:

  • ExecutorAllocationListener:通知给定的分配管理器何时添加和删除执行器。
  • ExecutorMonitor:执行器活动的监视器,用于检测空闲执行器。

定时调度每100ms执行一次schedule方法。
最后向更新集群发送所需executor的信息。

  • numExecutors:向集群申请的executor数量。集群不一定为了达到这个数量就启动或者杀死executor
  • localityAwareTasks:stage中具有局部首选项的任务数。这包括正在运行、挂起和已完成的任务。有些task是有指定在哪里运行或者哪里不运行的。
  • hostToLocalTaskCount:host和希望在host上运行的task数量。包括正在运行、挂起和已完成的任务。

schedule

调用executorMonitor的timedOutExecutors获取超时的executor。
如有超时的executor,表明executor首次部署成功过,将initializing置为false,标志可以进行动态调整executor数量。
调用updateAndSyncNumExecutorsTarget方法向集群同步executor调度的相关信息,集群收到新的信息后会判断是否满足需求,不满足的话会添加executor。这里集群只可能增加executor来满足目标数量,不会进行kill executor。
最后调用removeExecutors移除超时的executor集合。

updateAndSyncNumExecutorsTarget

首先是调用maxNumExecutorsNeeded方法获取所需executor的最大数量。

  • initializing为true,表明executor首次还没有部署完成,不能动态调整
  • maxNeeded < numExecutorsTarget:此次所需的最大数量比上次申请的executor数量少,此时就要向集群更新executor目标数量,让集群可以停止还没有完成部署的executor的申请
  • addTime != NOT_SET && now >= addTime:到达添加时间,可以申请添加executor
  • 其他情况:没有达到添加时间

maxNumExecutorsNeeded

计算当前任务所需要的最大executor数量。

addExecutors

计算新的executor目标值,每次新增都是加上numExecutorsToAdd变量值。再经过校验调整到合理的值。
如果跟上一次的目标值一致,表示新增executor过程完成了,重置numExecutorsToAdd为1。
向集群发送executor目标值,让集群根据情况调整。
最后调整numExecutorsToAdd方便下一次扩容。
executor新增的速度是 1 2 4 8…,这样做是因为新增速度为固定值会造成目标1.executor数量小,增长速度大,申请了过多的executor;2.目标executor数量大,增长速度小,executor扩容慢。

image.png

removeExecutors

移除executor不能直接将超时的executor都移除了,存活的executor数量还要大于等于executor最小数量、executor目标数量。
executorIdsToBeRemoved是实际需要移除的executor

向集群发送kill executor的命令,更新executor目标数量到集群。最后修改executorMonitor中对应executor状态为待移除,不再进行监控这些executor

onSchedulerBacklogged

当调度程序收到新的待处理任务时调用回调。有挤压任务,添加addTime

  1. stage完成提交,等待task调度
  2. 推测task提交
  3. task执行失败,需要重试执行

onSchedulerQueueEmpty

没有等待执行的task任务,重置addTime

  • stage中task全部完成
  • task开始,pending的task数量为0

ExecutorAllocationListener

可以简单看一下相关变量,只要是记录stage和task的关系(task总量,运行的task数量,pending的task数量,运行的推测task数量,pending的推测task数量。。。)
它是是一个listener,主要是监听了stage和task相关事件

  • SparkListenerStageSubmitted
  • SparkListenerStageCompleted
  • SparkListenerTaskStart
  • SparkListenerTaskEnd
  • SparkListenerSpeculativeTaskSubmitted


根据上面的变量,获取running和pending任务量

onStageSubmitted

stage提交完成,将initializing置为false。更新相关变量。

onStageCompleted

stage完成,修改相关变量。如果这个stage是最后一个stage,表明没有任务需要执行,就调用onSchedulerQueueEmpty将addTime、numExecutorsToAdd重置。
image.png

onTaskStart

task开始执行,更新相关变量。如果处于pending状态的task数量为0,调用onSchedulerQueueEmpty重置executor新增相关变量。

onTaskEnd

task执行结束,更新相关变量。

onSpeculativeTaskSubmitted

推测任务提交,更新相关变量。实际task数量增加,调用onSchedulerBacklogged进行新的调度。

ExecutorMonitor

ExecutorMonitor监听executor相关事件,使用Tracker记录executor的信息,可以返回超时的executor信息。
executors:executor信息的集合
nextTimeout:下一次超时的时间
timedOutExecs:超时的executor集合

timedOutExecutors

遍历executor的tracker,获取超时的executor。最后更新下一次超时时间。
newNextTimeout下一次超时时间是所有executor中最近的超时时间

updateNextTimeout

更新nextTimeout

executorsKilled

是ExecutorAllocationManager在移除executor的时候调用,这里是标记executor为待移除,不是真的移除。真的移除是监听SparkListenerExecutorRemoved事件

监听相关的方法

基本都是更新相关的变量

Tracker

记录executor信息
主要变量:
timeoutAt:超时时间
idleStart:executor空闲开始时间
cachedBlocks:缓存的block

updateTimeout

获取timeout,不含cache和shuffle的就是idleTimeoutNs,有cacje和shuffle的时候还要计算cache和shuffle的超时时间。
调用ExecutorMonitor的updateNextTimeout更新下一次超时时间nextTimeout

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/379953.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微信小程序 button样式设置为图片的方法

微信小程序 button样式设置为图片的方法 background-image background-size与background-repeat与border:none;是button必须的 <view style" position: relative;"><button class"customer-service-btn" style"background-image: url(./st…

Python 合并两个有序数组

Python 合并两个有序数组 正文 正文 题目说明如下&#xff1a; 这里我们直接让 nums1 的后 n 个数等于 nums2 数组&#xff0c;然后对 nums1 数组整体进行排序即可。 class Solution:def merge(self, nums1: List[int], m: int, nums2: List[int], n: int) -> None:"…

云原生系列 - Jenkins

Jenkins Jenkins&#xff0c;原名 Hudson&#xff0c;2011 年改为现在的名字。它是一个开源的实现持续集成的软件工具。 官方网站&#xff08;英文&#xff09;&#xff1a;https://www.jenkins.io/ 官方网站&#xff08;中文&#xff09;&#xff1a;https://www.jenkins.io…

网站开发:使用VScode安装yarn包和运行前端项目

一、首先打开PowerShell-管理员身份运行ISE 输入命令&#xff1a; set-ExecutionPolicy RemoteSigned 选择“全是”&#xff0c;表示允许在本地计算机上运行由本地用户创建的脚本&#xff0c;没有报错就行了 二、接着打开VScode集成终端 输入 npm install -g yarn 再次输入以…

[CP_AUTOSAR]_分层软件架构_接口之通信模块交互介绍

目录 1、协议数据单元(PDU)传输2、通信模块的案例2.1、SDU、 PCI & PDU2.2、通信模块构成2.3、从数据传输的角度看Communication2.4、Communication中的接口 在前面 《关于接口的一些说明》 以及  《Memory软件模块接口说明》 中&#xff0c;简要介绍了CP_AUTOSAR分层…

scp免密复制文件

实现在服务器A和服务器B之间使用scp命令免密互相传输文件 1. 在服务器A中免密复制到服务器B 1.1 生成服务器A的公钥私钥 #在服务器A中执行 ssh-keygen -t rsa -P ""命令执行完毕会在服务器A的 ~/.ssh 目录下生成两个文件&#xff1a;id_rsa 和 id_rsa.pub 1.2 拷…

nodejs下载+react安装

一、nodejs安装 1、nodejs下载 具体安装可参考连接&#xff1a;2023最新版Node.js下载安装及环境配置教程&#xff08;非常详细&#xff09;从零基础入门到精通&#xff0c;看完这一篇就够了_nodejs安装及环境配置-CSDN博客 下载地址&#xff1a;Node.js — 下载 Node.js 测…

web安全之SQL手工注入漏洞测试

一、目的 1.掌握SQL注入原理&#xff1b; Sql注入详解(原理篇)_sql注入攻击的原理-CSDN博客 2.了解手工注入的方法&#xff1b; 3.了解MySQL的数据结构&#xff1b; 4.了解字符串的MD5加解密 二、过程 1.进去后出现以下界面 找注入点 发现有注入点&#xff0c;即id被代入执…

AutoMQ 生态集成 Redpanda Console

通过 Kafka Web UI 更加便利地管理 Kafka/AutoMQ 集群 随着大数据技术的飞速发展&#xff0c;Kafka 作为一种高吞吐量、低延迟的分布式消息系统&#xff0c;已经成为企业实时数据处理的核心组件。然而&#xff0c;Kafka 集群的管理和监控却并非易事。传统的命令行工具和脚本虽…

【分布式事务】怎么解决分布式场景下数据一致性问题

分布式事务的由来 拿充值订单举个栗子吧&#xff0c;假设&#xff1a;原本订单模块和账户模块是放在一起的&#xff0c;现在需要做服务拆分&#xff0c;拆分成订单服务&#xff0c;账户余额服务。原本收到充值回调后&#xff0c;可以将修改订单状态和扣减余额放在一个mysql事务…

未来互联网的新篇章:深度解析Web3技术

随着技术的飞速发展&#xff0c;Web3作为新一代互联网技术范式&#xff0c;正在重新定义我们对互联网的认知和使用方式。本文将深入探讨Web3技术的核心概念、关键特征以及其在未来互联网发展中的潜力和影响&#xff0c;为读者打开Web3时代的大门。 Web3技术的核心概念和特征 1…

国内微短剧系统平台抖音微信付费小程序app开发源代码交付

微短剧作为当下热门的内容&#xff0c;结合抖音平台的广泛用户基础&#xff0c;开发微短剧付费小程序APP具有显著的市场潜力&#xff0c;用户对于短剧内容的需求旺盛&#xff0c;特别是在言情、总裁、赘婿等热门题材方面&#xff0c;接下来给大家普及一下微短剧小程序系统。 顺…

8、添加第三方包

目录 1、安装Django Debug Toolbar Django的一个优势就是有丰富的第三方包生态系统。这些由社区开发的包&#xff0c;可以用来快速扩展应用程序的功能集 1、安装Django Debug Toolbar Django Debug Toolbar位于名列前三的第三方包之一 这是一个用于调试Debug Web应用程序的有…

mybatis的xml中,where标签不自动删除多余的and之类的问题

遇到了这个莫名其妙的问题&#xff0c;起初是很疑惑的&#xff0c;where标签好像失灵了一般不会自动删除掉 多余的and 看了眼sql语句&#xff0c;发现还是有and没被删除。 后来重新写了遍后发现又没事了。真的是神人。 然后就研究了好一会&#xff0c;发现&#xff01;&#…

Hive理论讲解

Hive介绍 1、Hive本质 Hive本质是【数仓设计方案】&#xff0c;hive本身并不存储数据【数据包含&#xff1a;元数据 (表)数据】。 2、hql和sql对比 sql 结构化查询语言【structured query language】hql hive/hadoop类sql查询语言【hive/hadoop query language like sql…

零基础入门:创建一个简单的Python爬虫管理系统

摘要&#xff1a; 本文将手把手教你&#xff0c;从零开始构建一个简易的Python爬虫管理系统&#xff0c;无需编程基础&#xff0c;轻松掌握数据抓取技巧。通过实战演练&#xff0c;你将学会设置项目、编写基本爬虫代码、管理爬取任务与数据&#xff0c;为个人研究或企业需求奠…

Langchain-Chatchat3.1版本docker部署流程——知识库问答

Langchain——chatchat3.1版本docker部署流程Langchain-Chatchat 1. 项目地址 #项目地址 https://github.com/chatchat-space/Langchain-Chatchat #dockerhub地址 https://hub.docker.com/r/chatimage/chatchat/tags2. docker部署 参考官方文档 #官方文档 https://github.c…

百日筑基第二十四天-23种设计模式-结构型总汇

百日筑基第二十四天-23种设计模式-结构型总汇 前言 设计模式可以说是对于七大设计原则的实现。 总体来说设计模式分为三大类&#xff1a; 创建型模式&#xff0c;共五种&#xff1a;单例模式、简单工厂模式、抽象工厂模式、建造者模式、原型模式。结构型模式&#xff0c;共…

像 MvvmLight 一样使用 CommunityToolkit.Mvvm 工具包

文章目录 简介一、安装工具包二、实现步骤1.按照MvvmLight 的结构创建对应文件夹和文件2.编辑 ViewModelLocator3.引用全局资源二、使用详情1.属性2.命令3. 消息通知4. 完整程序代码展示运行结果简介 CommunityToolkit.Mvvm 包(又名 MVVM 工具包,以前称为 Microsoft.Toolkit…

pycharm2024破解

pycharm下载&#xff1a; Download PyCharm: The Python IDE for data science and web development by JetBrainshttps://www.jetbrains.com/pycharm/download/?sectionwindowspython3.12.4下载&#xff1a; https://www.python.org/ftp/python/3.12.4/python-3.12.4-amd64…