Spark RDD的分区与依赖关系

Spark RDD的分区与依赖关系

RDD分区

RDD,Resiliennt Distributed Datasets,弹性式分布式数据集,是由若干个分区构成的,那么这每一个分区中的数据又是如何产生的呢?这就是RDD分区策略所要解决的问题,下面我们就一道来学习RDD分区相关。

RDD数据分区

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数。

分区的决定,就是在宽依赖的过程中才有,窄依赖因为是一对一,分区确定的,所以不需要指定分区操作。

1)Partitioner:在Spark中涉及RDD的分区策略的抽象类为Partitioner,其继承体系如图-27所示,有两个核心的子类实现,一个HashPartitioner,一个RangePartitioner。

图-27 spark Partitioner继承体系

Spark中数据分区的主要工具类(数据分区类),主要用于Spark底层RDD的数据重分布的情况中,主要方法两个,如图-28所示:

图-28 Partitioner抽象类

2)HashPartitioner:Spark中非常重要的一个分区器,也是默认分区器,默认用于90%以上的RDD相关API上。

功能:依据RDD中key值的hashCode的值将数据取模后得到该key值对应的下一个RDD的分区id值,支持key值为null的情况,当key为null的时候,返回0;该分区器基本上适合所有RDD数据类型的数据进行分区操作;但是需要注意的是,由于JAVA中数组的hashCode是基于数组对象本身的,不是基于数组内容的,所以如果RDD的key是数组类型,那么可能导致数据内容一致的数据key没法分配到同一个RDD分区中,这个时候最好自定义数据分区器,采用数组内容进行分区或者将数组的内容转换为集合。HashPartitioner代码如下:

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
//加载数据
val rdd = sc.parallelize(List((1,3),(1,2),(2,4),(2,3),(3,6),(3,8)),8)
//通过Hash分区
val result: RDD[(Int, Int)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
//获取分区方式
println(result.partitioner)
//获取分区数
println(result.getNumPartitions)
}

RDD自定义分区

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合我们的需求,这时候我们就可以自定义分区策略。

要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。

1)numPartitions: Int:返回创建出来的分区数。

2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

3)equals():Java判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。

案例一:模拟实现HashPartitioner。

class CustomerPartitoner(numPartiton:Int) extends Partitioner{// 返回分区的总数override def numPartitions: Int = numPartiton// 根据传入的Key返回分区的索引override def getPartition(key: Any): Int = {key.hashCode()%numparts}}object CustomerPartitoner {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setAppName("CustomerPartitoner").setMaster("local[*]")val sc = new SparkContext(sparkConf)//zipWithIndex该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。val rdd = sc.parallelize(0 to 10,1).zipWithIndex()val func = (index:Int,iter:Iterator[(Int,Long)]) =>{iter.map(x => "[partID:"+index + ", value:"+x+"]")}val r = rdd.mapPartitionsWithIndex(func).collect()for (i <- r){println(i)}val rdd2 = rdd.partitionBy(new CustomerPartitoner(5))val r1 = rdd2.mapPartitionsWithIndex(func).collect()println("----------------------------------------")for (i <- r1){println(i)}println("----------------------------------------")sc.stop()}}

总结:

1)分区主要面对KV结构数据,Spark内部提供了两个比较重要的分区器,Hash分区器和Range分区器。

2)hash分区主要通过key的hashcode来对分区数求余,hash分区可能会导致数据倾斜问题,Range分区是通过水塘抽样的算法来将数据均匀的分配到各个分区中。

3)自定义分区主要通过继承partitioner抽象类来实现,必须要实现两个方法:numPartitions 和 getPartition(key: Any)。

RDD依赖关系

RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。

依赖关系

图-29是源码中的一张图,可以发现一个问题Dependency(依赖)的意思可以发现ShuffleDependency是其子类(即宽依赖),NarrowDependency是其子类(即窄依赖)。

图-29 Dependency体系

1)宽窄依赖:所谓窄依赖,指的是子RDD一个分区中的数据,来自于上游RDD中一个分区。所谓宽依赖,指的是子RDD一个分区中的数据,来自于上游RDD所有的分区。

宽窄依赖关系示例如图-30所示:

图-30 宽窄依赖示例图

2)血统Lineage:RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。关于linage说明示意图如图-31所示:

图-31 lineage示例图

3)DAG有向无环图:如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图。有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树均为有向无环图。通俗的来说就是有方向,没有回流的图可以称为有向无环图,示意图如图-32所示。

图-32 有像无环图

4)RDD任务的切分:对于RDD的任务切分,可以很形象的如图-33所示。

图-33 RDD任务的切分

并行度:程序同一时间执行作业的线程个数。

原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,如图-34所示。

图-34 RDD stage的切分

对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的重要依据。Stage阶段计算过程如图所示-35所示。

图-35 RDD stage阶段计算过程

任务生成和提交的四个阶段

Spark任务生产和提交的四个步骤可以归纳如下:

1)构建DAG:用户提交的job将首先被转换成一系列RDD并通过RDD之间的依赖关系构建DAG,然后将DAG提交到调度系统。

DAG描述多个RDD的转换过程,任务执行时,可以按照DAG的描述,执行真正的计算。

DAG是有边界的:开始(通过sparkcontext创建的RDD),结束(触发action,调用runjob就是一个完整的DAG形成了,一旦触发action,就形成了一个完整的DAG)。

一个RDD描述了数据计算过程中的一个环节,而一个DAG包含多个RDD,描述了数据计算过程中的所有环节。

一个spark application可以包含多个DAG,取决于具体有多少个action。

2)DAGScheduler:将DAG切分stage(切分依据是shuffle),将stage中生成的task以taskset的形式发送给TaskScheduler

为什么要切分stage?

一个是复杂业务逻辑(将多台机器上具有相同属性的数据聚合到一台机器上:shuffle)。

如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,下一个阶段的计算依赖上一个阶段的数据。

在同一个stage中,会有多个算子,可以合并到一起,我们很难称其为pipeline(流水线,严格按照流程、顺序执行)。

3)TaskScheduler:调度task(根据资源情况将task调度到Executors).

4)Executors:接收task,然后将task交给线程池执行。

具体可以简化为如图-38所示。

图-38 spark任务生成和提交图

排序

TopN

topN就是上述sortBy/sortByKey之后执行action操作take(N),或者直接takeOrderd(N),建议使用后者,效率高于前者。具体操作省略。

二次排序

所谓二次排序,指的是排序字段不唯一,有多个,共同排序,仍然使用上面的数据,对学生的身高和年龄一次排序。

object SecondSortOps {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName(s"${SecondSortOps.getClass.getSimpleName}").setMaster("local[2]")val sc = new SparkContext(conf)//sortByKey 数据类型为k-v,且是按照key进行排序val personRDD:RDD[Person] = sc.parallelize(List(Person(1, "吴轩宇", 19, 168),Person(2, "彭国宏", 18, 175),Person(3, "随国强", 18, 176),Person(4, "闫  磊", 20, 180),Person(5, "王静轶", 18, 168)))personRDD.map(stu => (stu, null)).sortByKey(true, 1).foreach(p => println(p._1))sc.stop()}}case class Person(id:Int, name:String, age:Int, height:Double) extends Ordered[Person] {//对学生的身高和年龄依次排序override def compare(that: Person) = {var ret = this.height.compareTo(that.height)if(ret == 0) {ret = this.age.compareTo(that.age)}ret}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/315950.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言基础:初识指针(二)

当你不知道指针变量初始化什么时&#xff0c;可以初始化为空指针 int *pNULL; 我们看NULL的定义&#xff0c;可以看出NULL是0被强制转化为Void* 类型的0&#xff1b;实质还是个0&#xff1b; 如何避免野指针&#xff1a; 1. 指针初始化 2. 小心指针越界 3. 指针指向空间…

springboot笔记一:idea社区版本创建springboot项目的方式

社区idea 手动maven 创建springboot项目 创建之后修改pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:sc…

苹果发布开源模型;盘古大模型5.0将亮相;英伟达将收购 Run:ai

苹果首次发布开源语言模型 近期&#xff0c;苹果在 Hugging Face 发布了 OpenELM 系列模型。OpenELM 的关键创新是逐层扩展策略&#xff0c;该策略可在 transformer 模型的每一层中有效地分配参数&#xff0c;从而提高准确性。 与具有统一参数分配的传统语言模型不同&#xff…

elasticsearch-8.1.0安装记录

目录 零、版本说明一、安装二、使用客户端访问 零、版本说明 centos [rootnode1 ~]# cat /etc/redhat-release CentOS Linux release 7.9.2009 (Core)elasticsearch elasticsearch-8.1.0-linux-x86_64一、安装 systemctl stop firewalld.servicesystemctl disable firewal…

ubuntu开启message文件

环境&#xff1a;ubuntu 20.04 1、首先需要修改 /etc/rsyslog.d/50-default.conf 文件&#xff1b;源文件中message被注释&#xff0c;如下图&#xff1a; 2、打开注释&#xff1a; 3、重启服务 systemctl restart rsyslog.service 如此即可&#xff01;

element -ui 横向时间轴,时间轴悬浮对应日期

效果&#xff1a; <el-tabs v-model"activeName" type"card" tab-click"handleClick"><el-tab-pane label"周期性巡视" name"zqxxs" key"zqxxs" class"scrollable-tab-pane"><div v-if…

Spring Kafka—— KafkaListenerEndpointRegistry 隐式注册分析

由于我想在项目中实现基于 Spring kafka 动态连接 Kafka 服务&#xff0c;指定监听 Topic 并控制消费程序的启动和停止这样一个功能&#xff0c;所以就大概的了解了一下 Spring Kafka 的几个重要的类的概念&#xff0c;内容如下&#xff1a; ConsumerFactory 作用&#xff1a;…

【Linux】信号的产生

目录 一. 信号的概念signal() 函数 二. 信号的产生1. 键盘发送2. 系统调用kill()raise()abort() 3. 软件条件alarm() 4. 硬件异常除零错误:野指针: 三. 核心转储 一. 信号的概念 信号是消息的载体, 标志着不同的行为; 是进程间发送异步信息的一种方式, 属于软中断. 信号随时都…

PNPM - nodejs 包管理

文章目录 一、关于 PNPM开发动机1、节省磁盘空间2、提升安装速度3、创建一个 non-flat node_modules 文件夹 二、安装通过 npm 安装 pnpm通过 Homebrew 安装 pnpm 三、pnpm CLI1、与 npm 的差异2、参数-C <path>, --dir <path>-w, --workspace-root 3、命令4、环境…

免费语音转文字:自建Whisper,贝锐花生壳3步远程访问

Whisper是OpenAI开发的自动语音识别系统&#xff08;语音转文字&#xff09;。 OpenAI称其英文语音辨识能力已达到人类水准&#xff0c;且支持其它98中语言的自动语音辨识&#xff0c;Whisper神经网络模型被训练来运行语音辨识与翻译任务。 此外&#xff0c;与其他需要联网运行…

Blender曲线操作

1.几种常见建模方式 -多边形建模&#xff1a;Blender&#xff0c;C4D&#xff0c;3DsMax&#xff0c;MaYa -曲线&#xff1a; -曲面&#xff1a;Rhino&#xff08;Nurbs&#xff09; -雕刻&#xff1a;Blender&#xff0c;ZBrush -蜡笔&#xff1a;Blender 1&#xff09;新…

windows11家庭版开启Hyper-v

前提&#xff1a;如果在控制面板中-->程序和功能-->启用和关闭windows功能-->没有Hyper-v 1.什么是Hyper-v&#xff1f; Hyper-v分为两个部分&#xff1a;底层的虚拟机平台、上层的虚拟机管理软件 2.Hyper-v安装 2.1新建hyper.cmd文件&#xff0c;写入下面的内容&…

物联网:从电信物联开发平台AIoT获取物联设备上报数据示例

设备接入到电信AIoT物联平台后&#xff0c;可以在平台上查询到设备上报的数据。 下面就以接入的NBIOT物联远传水表为例。 在产品中选择指定设备&#xff0c;在数据查看中可以看到此设备上报的数据。 示例中这组数据是base64位加密的&#xff0c;获取后还需要转换解密。 而我…

Linux软件包管理器——yum

文章目录 1.什么是软件包1.1安装与删除命令1.2注意事项1.3查看软件包1.3.1注意事项&#xff1a; 2.关于rzsz3.有趣的Linux下的指令 -sl 1.什么是软件包 在Linux下安装软件, 一个通常的办法是下载到程序的源代码, 并进行编译, 得到可执行程序. 但是这样太麻烦了, 于是有些人把一…

操作系统安全:Windows与Linux的安全标识符,身份鉴别和访问控制

「作者简介」:2022年北京冬奥会网络安全中国代表队,CSDN Top100,就职奇安信多年,以实战工作为基础对安全知识体系进行总结与归纳,著作适用于快速入门的 《网络安全自学教程》,内容涵盖系统安全、信息收集等12个知识域的一百多个知识点,持续更新。 操作系统有4个安全目标…

Unity Meta Quest MR 开发(七):使用 Stencil Test 模板测试制作可以在虚拟与现实之间穿梭的 MR 传送门

文章目录 &#x1f4d5;教程说明&#x1f4d5;Stencil Test 模板测试&#x1f4d5;Stencil Shader&#x1f4d5;使用 Unity URP 渲染管线设置模板测试⭐Render Pipeline Asset 与 Universal Renderer Data⭐删除场景中的天空盒⭐设置虚拟世界的层级 Layer⭐设置模板测试 &#…

大数据运维之数据质量管理

第1章 数据质量管理概述 1.1 数据质量管理定义 数据质量管理&#xff08;Data Quality Management&#xff09;&#xff0c;是指对数据从计划、获取、存储、共享、维护、应用、消亡生命周期的每个阶段里可能引发的各类数据质量问题&#xff0c;进行识别、度量、监控、预警等一…

使用工具速记

文章目录 一、sqlyoy登录账号信息迁移二、idea导入之前的已配置的idea信息三、设置windows UI大小四、其他 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、sqlyoy登录账号信息迁移 工具(sqlyog上面菜单栏)->导入导出详情->选择要导出的账号…

Graph Neural Networks(GNN)学习笔记

本学习笔记的组织结构是&#xff0c;先跟李沐老师学一下&#xff0c;再去kaggle上寻摸一下有没有类似的练习&#xff0c;浅做一下&#xff0c;作为一个了解。 ———————————0428更新—————————————— 课程和博客看到后面准备主要看两个&#xff1a;GCN和…

jvm知识点总结(二)

Java8默认使用的垃圾收集器是什么? Java8版本的Hotspot JVM,默认情况下使用的是并行垃圾收集器&#xff08;Parallel GC&#xff09; 如果CPU使用率飙升&#xff0c;如何排查? 1.先通过top定位到消耗最高的进程id 2.执行top -h pid单独监控该进程 3.在2中输入H&#xff…