Spark---累加器和广播变量

文章目录

  • 1.累加器实现原理
  • 2.自定义累加器
  • 3.广播变量

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)var sum=0dataRdd.foreach(num=>sum+=num)println(sum)context.stop()

运行结果:
在这里插入图片描述
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
在这里插入图片描述
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))val sum = context.longAccumulator("sum")dataRdd.foreach(num=>{//使用累加器sum.add(num)})//获取累加器的值println(sum.value)

运行结果:
在这里插入图片描述
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
在这里插入图片描述

Spark提供了多种类型的累加器,以下是其中的一些:
在这里插入图片描述

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

在这里插入图片描述
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqiimport bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable/*** 使用累加器完成WordCount案例*/
object Spark_addDemo {def main(args: Array[String]): Unit = {//建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")//创建累加器对象val wordCountAccumulator = new WordCountAccumulator//向Spark中进行注册context.register(wordCountAccumulator,"wordCountAccumulator")//实现累加dataRDD.foreach(word => {wordCountAccumulator.add(word)})//获取累加结果,打印在控制台上println(wordCountAccumulator.value)//关闭链接context.stop()}}class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{//定义一个map用于存储累加后的结果var map: mutable.Map[String, Long] =mutable.Map[String,Long]()//累加器是否为初始状态override def isZero: Boolean = {map.isEmpty}//复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new WordCountAccumulator()}//重置累加器override def reset(): Unit = {map.clear()}//向累加器添加数据INoverride def add(word: String): Unit = {// 查询 map 中是否存在相同的单词// 如果有相同的单词,那么单词的数量加 1// 如果没有相同的单词,那么在 map 中增加这个单词val newValue = map.getOrElse(word, 0L) + 1map.update(word,newValue)}//合并累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {var map1=this.mapvar map2=other.value//合并两个mapmap2.foreach({case (word,count)=>{val newValue = map1.getOrElse(word,0L)+countmap1.update(word,newValue)}})}//返回累加器的结果(OUT)override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
在这里插入图片描述

3.广播变量

在Apache Spark中,广播变量(Broadcast Variables)是一种特殊类型的变量,用于优化数据共享。当Spark应用程序在集群中的多个节点上运行时,每个节点都需要访问相同的数据集。如果数据集很大,那么将这些数据发送到每个节点可能会非常耗时和低效。 广播变量提供了一种方法,可以在集群中的所有节点上共享数据集的一个只读副本 ,从而避免了在每个节点上重复发送数据。

在这里插入图片描述
广播变量也叫分布式只读变量,它可以将闭包数据发送到每个Executor的内存中来达到共享的目的,Executor其实就相当于一个JVM,在启动的时候会自动分配内存。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val rdd1 = context.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 4)val list = List(("a", 4), ("b", 5), ("c", 6), ("d", 7))// 声明广播变量val broadcast: Broadcast[List[(String, Int)]] = context.broadcast(list)val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {case (key, num) => {var num2 = 0// 使用广播变量for ((k, v) <- broadcast.value) {if (k == key) {num2 = v}}(key, (num, num2))}}resultRDD.collect().foreach(print)context.stop()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/240193.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV-19图像的仿射变换

放射变换是图像旋转&#xff0c;缩放&#xff0c;平移的总称&#xff0c;具体的做法是通过一个矩阵和原图片坐标进行计算&#xff0c;得到新的坐标&#xff0c;完成变换&#xff0c;所以关键就是这个矩阵。 一、仿射变换之图像平移 使用API------warpAffine&#xff08;src &…

微信小程序上传并显示图片

实现效果&#xff1a; 上传前显示&#xff1a; 点击后可上传&#xff0c;上传后显示&#xff1a; 源代码&#xff1a; .wxml <view class"{{company_logo_src?blank-area:}}" style"position:absolute;top:30rpx;right:30rpx;height:100rpx;width:100rp…

C++(1) —— 基础语法入门

目录 一、C初识 1.1 第一个C程序 1.2 注释 1.3 变量 1.4 常量 1.5 关键字 1.6 标识符命名规则 二、数据类型 2.1 整型 2.2 sizeof 关键字 2.3 实型&#xff08;浮点型&#xff09; 2.4 字符型 2.5 转义字符 2.6 字符串型 2.7 布尔类型 bool 2.8 数据的输入 三…

idea使用docker-compose发布应用程序

非常重要的话说在前头 idea要想使用docker-compose&#xff0c;不能使用ssh创建idea Docker&#xff0c;而需要使用socket创建idea Docker。 socket docker是不安全的&#xff0c;任何人都可以访问你的docker&#xff0c;所以只能测试环境使用&#xff0c;请勿在正式环境使用s…

ubuntu设置每天定时关机

ubuntu设置每天定时关机 终端输入命令&#xff1a; sudo crontab -e输入密码&#xff0c;回车。 我这里使用nano作为编辑器&#xff0c;你可以选择vim。 在末尾输入以下命令&#xff1a; 59 23 * * * sudo -u root shutdown now设置&#xff1a;每天23:59分&#xff0c;电脑…

pyqtgraph绘图类

pyqtgraph绘图类 pyqtgraph绘图有四种方法: 方法描述pyqtgraph.plot()创建一个新的QWindow用来绘制数据PlotWidget.plot()在已存在的QWidget上绘制数据PlotItem.plot()在已存在的QWidget上绘制数据GraphicsLayout.addPlot()在网格布局中添加一个绘图 上面四个方法都接收同样…

Floyd - Warshall算法

顶点 public class Vertex {String name;List<Edge> edges;// 拓扑排序相关int inDegree;int status; // 状态 0-未访问 1-访问中 2-访问过&#xff0c;用在拓扑排序 ​// dfs, bfs 相关boolean visited;//是否被访问过 ​// 求解最短距离相关private static final int …

利用低代码技术,企业怎样开拓数字化转型新路径?

近年来&#xff0c;随着技术的发展和市场竞争的加剧&#xff0c;企业数字化转型已成为一种趋势。许多企业已经完成了线上协作办公的初步转型&#xff0c;这主要得益于像钉钉、企微等发展完善的平台&#xff0c;只需将员工全部拉入这些平台&#xff0c;就能实现线上协作办公。 然…

2024年甘肃省职业院校技能大赛信息安全管理与评估 样题一 模块二

竞赛需要完成三个阶段的任务&#xff0c;分别完成三个模块&#xff0c;总分共计 1000分。三个模块内容和分值分别是&#xff1a; 1.第一阶段&#xff1a;模块一 网络平台搭建与设备安全防护&#xff08;180 分钟&#xff0c;300 分&#xff09;。 2.第二阶段&#xff1a;模块二…

muduo网络库剖析——通道Channel类

muduo网络库剖析——通道Channel类 前情从muduo到my_muduo 概要事件种类channel 框架与细节成员函数细节实现使用方法 源码结尾 前情 从muduo到my_muduo 作为一个宏大的、功能健全的muduo库&#xff0c;考虑的肯定是众多情况是否可以高效满足&#xff1b;而作为学习者&#x…

Linux学习记录——사십삼 高级IO(4)--- Epoll型服务器(1)

文章目录 1、理解Epoll和对应接口2、简单实现 1、理解Epoll和对应接口 poll依然需要OS去遍历所有fd。一个进程去多个特定的文件中等待&#xff0c;只要有一个就绪&#xff0c;就使用select/poll系统调用&#xff0c;让操作系统把所有文件遍历一遍&#xff0c;哪些就绪就加上哪…

07-微服务getaway网关详解

一、初识网关 在微服务架构中&#xff0c;一个系统会被拆分为很多个微服务。那么作为客户端要如何去调用这么多的微服务呢&#xff1f;如果没有网关的存在&#xff0c;我们只能在客户端记录每个微服务的地址&#xff0c;然后分别去调用。这样的话会产生很多问题&#xff0c;例…

Databend 开源周报第 128 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 使用 Databend …

BuildRoot配置RTL8822CE WIFIBT模块(WIFI部分)

TinkerBoard2主板自带的无线模块为RTL8822CE&#xff0c;PCIe接口 之前在风火轮下载的Linux源码编译出来的BuildRoot根文件系统没有相关的驱动文件 [rootrk3399:/]# find . -name *.ko [rootrk3399:/]# lsmod Module Size Used by Not tainted [rootrk33…

UI设计中插画赏析和产品色彩分析

插画赏析&#xff1a; 1. 插画是设计的原创性和艺术性的基础 无论是印刷品、品牌设计还是UI界面&#xff0c;更加风格化的插画能够将不同的风格和创意加入其中&#xff0c;在激烈的竞争中更容易因此脱颖而出。留下用户才有转化。 2. 插画是视觉触发器&#xff0c;瞬间传达大量…

ARM day1

一、概念 ARM可以工作的七种模式用户、系统、快中断、中断、管理、终止、未定义ARM核的寄存器个数 37个32位长的寄存器&#xff0c;当前处理器的模式决定着哪组寄存器可操作&#xff0c;且任何模式都可以存取&#xff1a; PC(program counter程序计数器) CPSR(current program…

自存angular cli创建分区的module

创建module ng g module /admin/promotion --routing 目标文件夹下会有 正常创建组件 在上一级路由中写 promotion的路由 {path: "promotion", //推广loadChildren: () >import("./promotion/promotion.module").then((m) > m.PromotionModul…

详解React与Vue的性能对比

React 和 Vue 是当前最流行的前端开发框架之一。它们都具有高度的灵活性和可扩展性&#xff0c;但在某些方面有所不同。在本篇文章中&#xff0c;我将详细介绍 React 和 Vue 这两个技术&#xff0c;并比较它们的优点和缺点。 目录 1. React&#xff1a; 1.1 优点&#xff1a; …

力扣白嫖日记(sql)

前言 练习sql语句&#xff0c;所有题目来自于力扣&#xff08;https://leetcode.cn/problemset/database/&#xff09;的免费数据库练习题。 今日题目&#xff1a; 586.订单最多的客户 表&#xff1a;Orders 列名类型order_numberintcustomer_numberint 查找下了最多订单的…

QT第五天

使用QT绘图和绘图事件&#xff0c;完成仪表盘绘图&#xff0c;如下图&#xff1a; 程序运行结果&#xff1a; 代码&#xff1a; widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPainter> #include <QPen> #include <QBrush&…