Spark 之 Cache

数据本地性
object TaskLocality extends Enumeration {// Process local is expected to be used ONLY within TaskSetManager for now.val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Valuetype TaskLocality = Valuedef isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {condition <= constraint}
}
  • PROCESS_LOCAL:

要处理的数据在同一个本地进程,
即数据和Task在同一个Excutor JVM中。
这种情况是RDD的数据经过缓存,此时不需要网络传输,是最优locality。(但是数据要先缓存)。

  • NODE_LOCAL:

(1)数据和Task在同一节点上的不同executor中;
(2)数据HDFS和Task在同一个结点上,
此时需要进行进程间进行传输,速度比PROCESS_LOCAL略慢。

  • NO_PREF:

数据从哪访问都一样,相当于没有数据本地性,一般值从外部数据源读取数据。

  • RACK_LOCAL:

数据与Task在同机架的不同节点,此时需要通过网络传输,速度比NODE_LOCAL慢。

  • ANY:

数据和Task可能在集群的任何地方,性能最差,一般出现这种情况就该排查原因了

TaskSetManager

TaskSetManager.scala

  /** Add a task to all the pending-task lists that it should be on. */private[spark] def addPendingTask(index: Int,resolveRacks: Boolean = true,speculatable: Boolean = false): Unit = {// A zombie TaskSetManager may reach here while handling failed task.if (isZombie) returnval pendingTaskSetToAddTo = if (speculatable) pendingSpeculatableTasks else pendingTasksfor (loc <- tasks(index).preferredLocations) {loc match {case e: ExecutorCacheTaskLocation =>pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += indexcase e: HDFSCacheTaskLocation =>val exe = sched.getExecutorsAliveOnHost(loc.host)exe match {case Some(set) =>for (e <- set) {pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index}logInfo(s"Pending task $index has a cached location at ${e.host} " +", where there are executors " + set.mkString(","))case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +", but there are no executors alive there.")}case _ =>}pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += indexif (resolveRacks) {sched.getRackForHost(loc.host).foreach { rack =>pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index}}}if (tasks(index).preferredLocations == Nil) {pendingTaskSetToAddTo.noPrefs += index}pendingTaskSetToAddTo.all += index}
TaskLocation

TaskLocation.scala

/*** A location that includes both a host and an executor id on that host.*/
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)extends TaskLocation {override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}/*** A location on a host.*/
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {override def toString: String = host
}/*** A location on a host that is cached by HDFS.*/
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {override def toString: String = TaskLocation.inMemoryLocationTag + host
}
private[spark] object TaskLocation {// We identify hosts on which the block is cached with this prefix.  Because this prefix contains// underscores, which are not legal characters in hostnames, there should be no potential for// confusion.  See  RFC 952 and RFC 1123 for information about the format of hostnames.val inMemoryLocationTag = "hdfs_cache_"// Identify locations of executors with this prefix.val executorLocationTag = "executor_"

按照PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY的顺序进行调度task。

getCacheLocs

DAGScheduler.scala

  private[scheduler]def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = rdd.stateLock.synchronized {cacheLocs.synchronized {// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) timesif (!cacheLocs.contains(rdd.id)) {// Note: if the storage level is NONE, we don't need to get locations from block manager.val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {IndexedSeq.fill(rdd.partitions.length)(Nil)} else {val blockIds =rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]blockManagerMaster.getLocations(blockIds).map { bms =>bms.map(bm => TaskLocation(bm.host, bm.executorId))}}cacheLocs(rdd.id) = locs}cacheLocs(rdd.id)}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/473196.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu 安装kafka-eagle

上传压缩包 kafka-eagle-bin-2.0.8.tar.gz 到集群 /root/efak 目录 cd /root/efak tar -zxvf kafka-eagle-bin-2.0.8.tar.gz cd /root/efak/kafka-eagle-bin-2.0.8 mkdir /root/efakmodule tar -zxvf efak-web-2.0.8-bin.tar.gz -C /root/efakmodule/ mv /root/efakmodule/efak…

Zotero 7本地pdf文件名自适应中英文格式

问题 Zotero7默认语言是中文&#xff0c;发现本地pdf文献中均会出现“等”字&#xff0c;出现中英文不统一的不便。 &#xff08;注&#xff1a;存在et al.的pdf&#xff0c;是从外部直接拖进去的&#xff0c;不是自动产生的。&#xff09; 解决 zotero 7提供了丰富的文件后…

Redis性能优化——针对实习面试

目录 Redis性能优化什么是bigkey&#xff1f;bigkey的危害&#xff1f;如何处理bigkey?什么是hotkey&#xff1f;hotkey的危害&#xff1f;如何处理hotkey&#xff1f;如何处理大量key集中过期问题&#xff1f;什么是内存碎片&#xff1f;为什么会有Redis内存碎片&#xff1f;…

牛客挑战赛77

#include <iostream>// 函数 kXOR&#xff1a;计算两个数在 k 进制下的异或和 // 参数&#xff1a; // a: 第一个正整数 // b: 第二个正整数 // k: 进制基数 // 返回值&#xff1a; // 两数在 k 进制下的异或和&#xff08;十进制表示&#xff09; long long kXO…

开源共建 | 长安链开发常见问题及规避

长安链开源社区鼓励社区成员参与社区共建&#xff0c;参与形式包括不限于代码贡献、文章撰写、社区答疑等。腾讯云区块链王燕飞在参与长安链测试工作过程中&#xff0c;深入细致地总结了长安链实际开发应用中的常见问题及其有效的规避方法&#xff0c;相关内容多次解答社区成员…

EWM 打印

目录 1 简介 2 后台配置 3 主数据 4 业务操作 1 简介 打印即输出管理&#xff08;output management&#xff09;利用“条件表”那一套理论实现。而当打印跟 EWM 集成到一起时&#xff0c;也需要利用 PPF&#xff08;Post Processing Framework&#xff09;那一套理论。而…

LLaMA-Factory全流程训练模型

&#x1f917;本文主要讲述在docker下使用LLaMA-Factory训练推理模型。 &#x1fae1;拉取镜像 首先需要启动docker&#xff0c;然后在终端中输入&#xff1a; docker run -tid --gpus all -p 8000:8000 --name LLM -e NVIDIA_DRIVER_CAPABILITIEScompute,utility -e NVIDIA…

WebSocket简易聊天室实现(有详细解释)

完整代码 Arata08/online-chat-demo 服务端: 1.编写配置类&#xff0c;扫描有 ServerEndpoint 注解的 Bean import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.s…

Excel超级处理器:高效实现2种批量生成二维码方式

在Excel数据处理中&#xff0c;二维码的批量生成是一个常见且重要的需求。借助Excel超级处理器这一强大的插件&#xff0c;用户可以轻松实现二维码的两种主要批量生成方式&#xff1a;直接在单元格中显示二维码图片&#xff0c;以及直接生成二维码图片并保存在文件夹中。超级处…

Linux Android 正点原子RK3568替换开机Logo完整教程

0.这CSDN是有BUG吗?大家注意:表示路径的2个点号全都变成3个点号啦! 接下来的后文中,应该是2个点都被CSDN变成了3个点: 1.将这两个 bmp 图片文件720x1280_8bit拷贝到内核源码目录下,替换内核源码中默认的 logo 图片。注意:此时还缺少电量显示图片 2.编译内核 make d…

性能高于Transformer模型1.7-2倍,彩云科技发布基于DCFormer架构通用大模型云锦天章

2017年&#xff0c;谷歌发布《Attention Is All You Need》论文&#xff0c;首次提出Transformer架构&#xff0c;掀开了人工智能自然语言处理&#xff08;NLP&#xff09;领域发展的全新篇章。Transformer架构作为神经网络学习中最重要的架构&#xff0c;成为后来席卷全球的一…

函数指针示例

目录&#xff1a; 代码&#xff1a; main.c #include <stdio.h> #include <stdlib.h>int Max(int x, int y); int Min(int x, int y);int main(int argc, char**argv) {int x,y;scanf("%d",&x);scanf("%d",&y);int select;printf(&q…

【书生大模型实战营 闯关材料】入门岛:第4关 玩转HF/魔搭/魔乐社区

2.1.2-2.1.3 InternLM 模型下载 模型下载 使用Hugging Face平台、魔搭社区平台&#xff08;可选&#xff09;和魔乐社区平台&#xff08;可选&#xff09;下载文档中提到的模型&#xff08;至少需要下载config.json文件、model.safetensors.index.json文件&#xff09;&#x…

Android - Pixel 6a 手机OS 由 Android 15 降级到 Android 14 操作记录

Pixel 6a 手机由 Android 14 升级到 Android 15了&#xff0c;但是由于一些原因又想降级回 Android 14&#xff0c; 能降吗&#xff1f;该怎么降级呢&#xff1f;本篇文章来记述实际操作过程&#xff0c;希望能给想做相同操作的人一些帮助。 答案当然是能降&#xff0c;而且我…

python-文件内容操作

文章目录 文件的介绍文件的理解文件操作基本知识文件对象属性与常用方法文件的读取文件的写入**上下文管理语句 with****读CSV文件**二维数据的存储从CSV格式的文件中读取数据将数据写入CSV格式的文件 读取Excel格式数据文件(pandas库)读取Excel格式数据文件(pandas库) 文件的介…

《操作系统 - 清华大学》3 -3:连续内存分配:内存碎片与分区的动态分配

文章目录 0. 概述1. 内存碎片问题2. 动态分配3. 首次适配算法4. 最优适配算法5. 最差适配算法 0. 概述 内存分配是操作系统管理过程中很重要的环节&#xff0c;首先需要考虑的是一块连续区域分配的过程&#xff0c;这个过程中会有很多问题&#xff0c;首先比较关注的一个问题是…

7.高可用集群架构Keepalived双主热备原理

一. 高可用集群架构Keepalived双主热备原理 (1)主机+备机keepalived配置(192.168.1.171) ! Configuration File for keepalivedglobal_defs {# 路由id:当前安装keepalived节点主机的标识符,全局唯一router_id keep_101 } #计算机节点(主机配置) vrrp_instance VI_1 {</

Notepad++的完美替代

由于Notepad的作者曾发表过可能在开发者代码中植入恶意软件的言论&#xff0c;他备受指责。在此&#xff0c;我向大家推荐一个Notepad的完美替代品——NotepadNext和Notepad--。 1、NotepadNext NotepadNext的特点&#xff1a; 1、跨平台兼容性 NotepadNext基于Electron或Qt…

【Chapter 3】Machine Learning Classification Case_Prediction of diabetes-XGBoost

文章目录 1、XGBoost Algorithm2、Comparison of algorithm implementation between Python code and Sentosa_DSML community edition(1) Data reading and statistical analysis(2)Data preprocessing(3)Model Training and Evaluation(4)Model visualization 3、summarize 1…

Linux(CentOS)安装达梦数据库 dm8

CentOS版本&#xff1a;CentOS 7&#xff0c;查看操作系统版本信息&#xff0c;请查阅 查看Linux内核版本信息 达梦数据库版本&#xff1a;dm8 一、获取 dm8 安装文件 1、下载安装文件 打开达梦官网&#xff1a;https://www.dameng.com/ 下载的文件 解压后的文件 2、上传安…