大数据-91 Spark 集群 RDD 编程-高阶 RDD广播变量 RDD累加器 Spark程序优化

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • RDD容错机制
  • RDD分区机制
  • RDD分区器
  • RDD自定义分区器

在这里插入图片描述

广播变量

基本介绍

有时候需要在多个任务之间共享变量,或者在任务(Task)和 Driver Program 之间共享变量。
为了满足这个需求,Spark提供了两种类型的变量。

  • 广播变量(broadcast variable)
  • 累加器(accumulators)
    广播变量、累加器的主要作用是为了优化Spark程序。

广播变量将变量在节点的Executor之间进行共享(由Driver广播),广播变量用来高效分发较大的对象,向所有工作节点(Executor)发送一个较大的只读值,以供一个或多个操作使用。

使用广播变量的过程如下:

  • 对一个类型T的对象调用SparkContext.broadcast创建一个Broadcast[T]对象,任何可序列化的类型都可以这么实现(在Driver端)
  • 通过Value属性访问该对象的值(Executor中)
  • 变量只会被分到各个Executor一次,作为只读值处理

在这里插入图片描述
广播变量的相关参数:

  • spark.broadcast.blockSize(缺省值: 4m)
  • spark.broadcast.checksum(缺省值:true)
  • spark.broadcast.compree(缺省值:true)

变量应用

普通JOIN

在这里插入图片描述

MapSideJoin

在这里插入图片描述

生成数据 test_spark_01.txt

1000;商品1
1001;商品2
1002;商品3
1003;商品4
1004;商品5
1005;商品6
1006;商品7
1007;商品8
1008;商品9

生成数据格式如下:
在这里插入图片描述

生成数据 test_spark_02.txt

10000;订单1;1000
10001;订单2;1001
10002;订单3;1002
10003;订单4;1003
10004;订单5;1004
10005;订单6;1005
10006;订单7;1006
10007;订单8;1007
10008;订单9;1008

生成的数据格式如下:
在这里插入图片描述

编写代码1

我们编写代码进行测试

package icu.wzkimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object JoinDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("JoinDemo").setMaster("local[*]")val sc = new SparkContext(conf)sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)val productRDD: RDD[(String, String)] = sc.textFile("data/test_spark_01.txt").map {line => val fields = line.split(";")(fields(0), line)}val orderRDD: RDD[(String, String)] = sc.textFile("data/test_spark_02.txt", 8).map {line => val fields = line.split(";")(fields(2), line)}val resultRDD = productRDD.join(orderRDD)println(resultRDD.count())Thread.sleep(100000)sc.stop()}}

编译打包1

mvn clean package

并上传到服务器,准备运行
在这里插入图片描述

运行测试1

spark-submit --master local[*] --class icu.wzk.JoinDemo spark-wordcount-1.0-SNAPSHOT.jar

提交任务并执行,注意数据的路径,查看下图:
在这里插入图片描述
运行结果可以查看到,运行了: 2.203100 秒 (取决于你的数据量的多少)
在这里插入图片描述

2024-07-19 10:35:08,808 INFO  [main] scheduler.DAGScheduler (Logging.scala:logInfo(54)) - Job 0 finished: count at JoinDemo.scala:32, took 2.203100 s
200

编写代码2

接下来,我们对比使用 MapSideJoin 的方式

package icu.wzkimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object MapSideJoin {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("MapSideJoin").setMaster("local[*]")val sc = new SparkContext(conf)sc.hadoopConfiguration.setLong("fs.local.block.size", 128 * 1024 * 1024)val productRDD: RDD[(String, String)] = sc.textFile("data/test_spark_01.txt").map {line => val fields = line.split(";")(fields(0), line)}val productBC = sc.broadcast(productRDD.collectAsMap())val orderRDD: RDD[(String, String)] = sc.textFile("data/test_spark_02.txt").map {line => val fields = line.split(";")(fields(2), line)}val resultRDD = orderRDD.map {case (pid, orderInfo) =>val productInfo = productBC.value(pid, (orderInfo, productInfo.getOrElse(pid, null)))}println(resultRDD.count())sc.stop()}}

编译打包2

mvn clean package

编译后上传到服务器准备执行:
在这里插入图片描述

运行测试2

spark-submit --master local[*] --class icu.wzk.MapSideJoin spark-wordcount-1.0-SNAPSHOT.jar

启动我们的程序,并观察结果
在这里插入图片描述
我们可以观察到,这次只用了 0.10078 秒就完成了任务:
在这里插入图片描述

累加器

基本介绍

累加器的作用:可以实现一个变量在不同的Executor端能保持状态的累加。
累加器在Driver端定义、读取,在Executor中完成累加。
累加器也是Lazy的,需要Action触发:Action触发一次,执行一次;触发多次,执行多次。

Spark内置了三种类型的累加器,分别是:

  • LongAccumulator 用来累加整数型
  • DoubleAccumulator 用来累加浮点型
  • CollectionAccumulator 用来累加集合元素

运行测试

我们可以在 SparkShell 中进行一些简单的测试,目前我在 h122 节点上,启动SparkShell

spark-shell --master local[*]

启动的主界面如下:
在这里插入图片描述
写入如下的内容进行测试:

val data = sc.makeRDD("hadoop spark hive hbase java scala hello world spark scala java hive".split("\\s+"))
val acc1 = sc.longAccumulator("totalNum1")
val acc2 = sc.doubleAccumulator("totalNum2")
val acc3 = sc.collectionAccumulator[String]("allwords")

我们进行测试的结果如下图所示:
在这里插入图片描述
继续编写一段进行测试:

val rdd = data.map{word => acc1.add(word.length); acc2.add(word.length); acc3.add(word); word}
rdd.count
rdd.collectprintln(acc1.value)
println(acc2.value)
println(acc3.value)

我们进行测试的结果如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/408233.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

井盖异动传感器:为城市安全加码

城市的地下管网错综复杂,井盖作为连接地面与地下的重要通道,其安全性至关重要。然而,由于各种原因导致的井盖丢失或损坏事件时有发生,给行人和车辆带来了极大的安全隐患。 一、智能科技,守护脚下安全 旭华智能井盖异…

SpringBoot2:创建项目及启动时相关报错整理

1、创建时报错 Initialization failed for https://start.aliyun.com/ Please check URL, network and proxy settings.Error message: Error parsing JSON response换官网地址初始化即可:https://start.spring.io/ 那么,大家肯定会疑问,官网…

【Java】Spring Boot使用 Email 传邮件 (上手图解)

Java系列文章目录 补充内容 Windows通过SSH连接Linux 第一章 Linux基本命令的学习与Linux历史 文章目录 Java系列文章目录一、前言二、学习内容:三、问题描述四、解决方案:4.1 认识依赖4.2 发送邮件步骤4.2.1 先获取授权码4.2.1 邮件配置4.2.2 主体内容…

堆《数据结构》

堆《数据结构》 1. 堆排序1.1 建堆向上调整建堆向下调整建堆 1.2 利用堆删除思想来进行排序1.3Top-k问题 2.堆的时间复杂度 1. 堆排序 1.1 建堆 建大堆 建小堆 向上调整建堆 AdjustUp建堆 void AdjustUp(HPDataType* a, int child) {// 初始条件// 中间过程// 结束条件int p…

【数据分析:RFM客户价值度模型】

前言: 💞💞大家好,我是书生♡,本阶段和大家一起分享和探索大数据技术RFM客户价值度模型,本篇文章主要讲述了:RFM客户价值度模型等等。欢迎大家一起探索讨论!!&#xff01…

机械学习—零基础学习日志(如何理解概率论7)

这里需要先理解伯努利试验。只有A与A逆,两种结果。 正态分布 再来一道习题~: 解析: 《概率论与数理统计期末不挂科|考研零基础入门4小时完整版(王志超)》学习笔记 王志超老师 (UP主)

5大分区管理器 - 最佳硬盘分区软件

分区是一个计算机术语,指的是在硬盘上创建多个区域,以便操作系统和分区管理软件能够高效地分别管理每个区域中的信息。经常使用电脑的人很可能会从拥有多个分区中受益。在硬盘上拥有分区的一个好处是,可以更轻松地将操作系统和程序文件与用户…

普元EOS-低开页面下拉选择控件加载列表数据

1 前言 普元EOS进行低代码开发页面可以高效提高开发效率,并且减少代码的出错机会。 在低代码开发页面的时候,表单页面中可以使用大量的常用控件。 本文将讲解下拉选择组件的使用。 2 下拉选择使用EOS内置字典作为数据源 下拉选择可从字典作为数据源&a…

粘包现象 | wireshark抓包的使用

在TCP协议的通信过程中,由于其面向流的特性,数据在传输过程中可能会发生粘包现象,即多个发送的数据包被接收方一次性接收,导致应用层无法正确解析数据。 1.粘包现象概述 TCP协议为了保证传输效率,可能会将多次send调…

ESP RainMaker OTA 自动签名功能的安全启动

【如果您之前有关注乐鑫的博客和新闻,那么应该对 ESP RainMaker 及其各项功能有所了解。如果不曾关注,建议先查看相关信息,知晓本文背景。】 在物联网系统的建构中,安全性是一项核心要素。乐鑫科技对系统安全给予了极高的重视。ES…

小程序学习day13-API Promise化、全局数据共享(状态管理)、分包

44、API Promise化 (1)基于回调函数的一部API的缺点:小程序官方提供的异步API都是基于回调函数实现的,容易造成回调地狱的问题,代码可读性、可维护性差 (2)API Promise化概念: 指…

【HarmonyOS NEXT星河版开发实战】页面跳转

个人主页→VON 收录专栏→鸿蒙综合案例开发​​​​​ 代码及其图片资源会发布于gitee上面(已发布) gitee地址https://gitee.com/wang-xin-jie234 目录 前言 界面功能介绍 界面构建过程 知识点概述 页面跳转 页面传参 全套源代码 Index页面 Sec…

C语言学习——文件

目录 十三、文件 13.1C文件概述 13.2文件类型指针 13.3文件的打开与关闭 文件的打开(fopen函数) 文件的关闭(fclose函数) 13.4文件的读写 fputc函数和fgetc函数(putc函数和getc函数) fread函数和fw…

【qt】自定义信号

我们在上篇中,服务器收到的消息是由线程类去处理的,消息在线程类中,传不到widget中的ui中去,如果我们要在界面显示客户端的消息,必须通过自定义信号. 1.构建信号 当线程收到信息,就会被填充在ba中&#xf…

PHPShort轻量级网址缩短程序源码开心版,内含汉化包

需要网址缩短并且想获得更多有关链接点击率和流量的数据分析,那么 PHPShort 可能是一个非常好的选择。PHPShort 是一款高级的 URL 缩短器平台,可以帮助你轻松地缩短链接,并根据受众群体的位置或平台来定位受众。 该程序基于 Laravel 框架编写…

Fiddle抓手机app的包

前言 本次文章讲述的是,fiddle获取手机代理,从而获取手机app的http、https请求! 一.下载安装汉化Fiddle 1.点击Fiddler官网下载链接:Download Fiddler Web Debugging Tool for Free by Telerik 2.直接运行,选择自己需…

Linux:进程的概念,进程相关函数

一、进程的概念 1.进程 进程是系统进行资源分配和调度的一个独立单元,它是操作系统结构的基础。进程是程序的一次执行过程,包含了程序代码、当前活动、系统资源(如CPU、内存、文件等)的使用情况等信息。每个进程都有自己独立的内…

AUTOSAR_EXP_ARAComAPI.pdf的第4章笔记

4 Fundamentals 为了理解AUTOSAR_EXP_ARAComAPI.pdf的第4章内容,生搬硬套的翻译了一把,准备先囫囵吞枣,再仔细理解。因为这些内容的理解也不是一时半会儿的。所以先放上来。 AUTOSAR_EXP_ARAComAPI.pdf的概述 因此,ara::com不提…

VS2022 - 制作自己的C#类库dll,并输出Unity识别的pdb调试信息文件

然后编写库代码,设置dll生成目录 *** 输出unity可以识别的pdb调试信息文件 *** 右键项目-属性-生成-高级-调试信息:可移植(Portable PDB) 这是因为Unity只能识别MDB和Portable PDB文件 这样设置后,把dll和pdb文件放入到Unity中同文件夹下&…

002、架构_概览

GoldenDB 主要由管理节点、计算节点、数据节点、全局事务节点等模块组成,各个节点无需共享任何资源,均为独立自治的通用计算机节点,之间通过高速互联的 网络通讯,从而完成对应用数据请求的快速处理和响应。 管理节点在数据库中主要…