【大数据学习 | Spark-Core】广播变量和累加器

1. 共享变量

Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。

累加器用来对信息进行聚合,相当于mapreduce中的counter;而广播变量用来高效分发较大的对象,相当于semijoin中的DistributedCache 。

共享变量出现的原因:

我们传递给Spark的函数,如map(),或者filter()的判断条件函数,能够利用定义在函数之外的变量,但是集群中的每一个task都会得到变量的一个副本,并且task在对变量进行的更新不会被返回给driver。

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object TestAcc {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("test acc")conf.setMaster("local[*]")val sc = new SparkContext(conf)val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9),3)val count = rdd.map(t=> 1).reduce(_+_)println(count)//    val acc = sc.longAccumulator("count")
//
//    rdd.foreach(t=>{
//      acc.add(1)
//    })
//
//    println(acc.value)//    println(rdd.count())}
}

原因总结:

对于executor端,driver端的变量是外部变量。

excutor端修改了变量count,根本不会让driver端跟着修改。如果想在driver端得到executor端修改的变量,需要用累加器实现。

当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低,也可能会造成内存溢出。使用广播变量以后,在每个Executor中只有一个Driver端变量副本,在一个executor中的并行执行的task任务会引用该一个变量副本即可,需要广播变量提高运行效率。

2. 累加器

累加器的执行流程:

通过SparkContext创建一个累加器并初始化。当driver端将任务分发给executor时,每个executor会接收一个任务和一个引用到该累加器的副本。每个executor上的任务可以调用累加器的add方法来增加累加器的值,这些操作是线程安全的,因为每个任务都会在自己的executor线程中执行。当每个任务完成,executor将累加器的更新值发送到driver端进行聚合过程,得到最终的聚合结果。

累加器可以很简便地对各个worker返回给driver的值进行聚合。累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数。

用法:

var acc: LongAccumulator = sc.longAccumulator // 创建累加器acc.add(1) // 累加器累加acc.value // 获取累加器的值

累加器的简单使用

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object WordCountWithAcc {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("test acc")conf.setMaster("local[*]")val sc = new SparkContext(conf)val acc = sc.longAccumulator("bad word")sc.textFile("data/a.txt").flatMap(_.split(" ")).filter(t=>{if(t.equals("shit")){acc.add(1)false}elsetrue}).map((_,1)).reduceByKey(_+_).foreach(println)println("invalid words:"+acc.value)}
}

3. 广播变量

ip转换工具

public class IpUtils {public static Long ip2Long(String ip) {String fragments[] = ip.split("[.]");Long ipNum = 0L;for(int i=0;i<fragments.length;i++) {ipNum = Long.parseLong(fragments[i]) | ipNum << 8L;}return ipNum;}
}

ip案例代码

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object IpTest {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ip")conf.setMaster("local[*]")val sc = new SparkContext(conf)val accessRDD = sc.textFile("data/access.log").map(t=>{val strs = t.split("\\|")IpUtils.ip2Long(strs(1))})val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{val strs = t.split("\\|")(strs(2).toLong,strs(3).toLong,strs(6)+strs(7))}).collect()//    accessRDD.map(ip=>{
//      ipRDD.filter(t=>{
//        ip>= t._1 && ip<= t._2
//      })
//    }).foreach(println)accessRDD.map(ip=>{ipArr.find(t=>{t._1<= ip && t._2>=ip}) match {case Some(v) => (v._3,1)case None => ("unknow",1)}//option}).reduceByKey(_+_).foreach(println)}
}

使用广播变量可以使程序高效地将一个很大的只读数据发送到executor节点,会将广播变量放到executor的BlockManager中,而且对每个executor节点只需要传输一次,该executor节点的多个task可以共用这一个。

用法:

val broad: Broadcast[List[Int]] = sc.broadcast(list) // 把driver端的变量用广播变量包装broad.value // 从广播变量获取包装的数据,用于计算

我们可能遇到这样的问题:如果我们需要广播的数据为100M,如果需要driver端亲自向每个executor端发送100M的数据,在工作中executor节点的个数可能是很多的,比如是200个,这意味着driver端要发送20G的数据,这对于driver端的压力太大了。所以要用到比特洪流技术。

就是说driver端不必向每个executor发送一份完整的广播变量的数据,而是将一份广播变量切分成200份,发送给两百个executor,然后200个executor间通过BlockManager中的组件transferService与其他executor通信,进行完整的数据。

这样driver端只需要发送一份广播变量的数据,压力就会小很多,而且其他executor也都拿到了这一份广播变量的数据 。

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}object IpTest {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("ip")conf.setMaster("local[*]")val sc = new SparkContext(conf)val accessRDD = sc.textFile("data/access.log").map(t=>{val strs = t.split("\\|")IpUtils.ip2Long(strs(1))})val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{val strs = t.split("\\|")(strs(2).toLong,strs(3).toLong,strs(6)+strs(7))}).collect()val bs = sc.broadcast(ipArr)//    accessRDD.map(ip=>{//      ipRDD.filter(t=>{//        ip>= t._1 && ip<= t._2//      })//    }).foreach(println)accessRDD.map(ip=>{bs.value.find(t=>{t._1<= ip && t._2>=ip}) match {case Some(v) => (v._3,1)case None => ("unknow",1)}//option}).reduceByKey(_+_).foreach(println)}
}

为了提高查找的效率,可以使用二分法查找代码。将时间复杂度由O(n)优化到了O(logn)。

      val start = System.currentTimeMillis()val res =  (binarySearch(ip,bs.value),1)
//      val res = bs.value.find(t=>{
//        t._1<= ip && t._2>=ip
//      }) match {
//        case Some(v) => (v._3,1)
//        case None => ("unknow",1)
//      }val end = System.currentTimeMillis()acc.add(end-start)

累加器实现运行时间的统计

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/478286.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

STM32编程小工具FlyMcu和STLINK Utility 《通俗易懂》破解

FlyMcu FlyMcu 模拟仿真软件是一款用于 STM32 芯片 ISP 串口烧录程序的专用工具&#xff0c;免费&#xff0c;且较为非常容易下手&#xff0c;好用便捷。 注意&#xff1a;STM32 芯片的 ISP 下载&#xff0c;只能使用串口1&#xff08;USART1&#xff09;&#xff0c;对应的串口…

MTK主板_安卓主板方案_MTK联发科主板定制开发

联发科(MTK)主板以其强大的性能和多样化的功能而受到广泛关注。该平台包括多个型号&#xff0c;例如MT6761、MT8766、MT6762、MT6765、MT8768和MT8788等&#xff0c;均配置了四核或八核64位处理器&#xff0c;主频可高达2.0GHz。采用先进的12nm工艺&#xff0c;搭载Android 11.…

信息收集(1)

学习视频引路信息收集&#xff08;1&#xff09;_哔哩哔哩_bilibili View信息收集&#xff08;1&#xff09; 分享一个漏洞挖掘平台&#xff1a;补天 以吉林通用航空职业技术学院|官网 (jlthedu.com)为目标 第一步&#xff1a;查看cdn和域名被注册的信息 可以查询域名信息的…

React(六)——Redux

文章目录 项目地址基本理解一、配置Redux store二、创建slice配置到store里并使用三、给Slice配置reducers&#xff0c;用来修改初始值 项目地址 教程作者&#xff1a;教程地址&#xff1a; 代码仓库地址&#xff1a; 所用到的框架和插件&#xff1a; dbt airflow基本理解 s…

如何利用ATECLOUD平台来实现数据报告的导出和数据分析?-纳米软件

1.数据报告导出 选择报告模板&#xff1a;ATECLOUD 平台通常会提供多种预设的数据报告模板&#xff0c;这些模板是根据不同的测试场景和需求设计的。例如&#xff0c;在电源模块测试中&#xff0c;有针对输出电压、电流、功率等基本参数的报告模板&#xff0c;也有包含纹波系数…

[ZJCTF 2019]NiZhuanSiWei

[ZJCTF 2019]NiZhuanSiWei 上面代码&#xff0c;使用get上传了三个参数&#xff0c;在text者用力恒等于&#xff0c;然后就输出&#xff0c;接着第二个参数中出现flag就输出not now&#xff0c;接着第三个参数是反序了一下输出。 ?textdata://text/plain,welcome to the zjct…

JSONCPP 数据解析与序列化

常用类接口 Json::Value 类 用于存储 JSON 数据的核心类。它支持将数据解析为对象、数组或基本类型&#xff08;如字符串、数值等&#xff09; 赋值操作符&#xff1a;Value& operator(Value other); 用于将一个 JSON 值赋给另一个 JSON 值 Json::Value value; value &…

40分钟学 Go 语言高并发:【实战】并发安全的配置管理器(功能扩展)

【实战】并发安全的配置管理器&#xff08;功能扩展&#xff09; 一、扩展思考 分布式配置中心 实现配置的集中管理支持多节点配置同步实现配置的版本一致性 配置加密 敏感配置的加密存储配置的安全传输访问权限控制 配置格式支持 支持YAML、TOML等多种格式配置格式自动…

ChatGPT 桌面版发布了,如何安装?

本章教程教大家如何进行安装。 一、下载安装包 官网地址地址&#xff1a;https://openai.com/chatgpt/desktop/ 支持Windows和MacOS操作系统 二、安装步骤 Windows用户下载之后&#xff0c;会有一个exe安装包&#xff0c;点击运行安装即可。 注意事项&#xff0c;如果Windows操…

【Electron学习笔记(二)】基于Electron开发应用程序

基于Electron开发本地应用程序 基于Electron开发本地应用程序前言正文1、创建 pages 目录2、创建 index.html 文件3 、创建 html.css 文件4 、main.js里引入页面5 、运行 start 命令6 、启用开发者模式7 、解决内容安全策略8、完善窗口行为9、配置自动重启&#xff0c;保存后自…

力扣--LCR 154.复杂链表的复制

题目 请实现 copyRandomList 函数&#xff0c;复制一个复杂链表。在复杂链表中&#xff0c;每个节点除了有一个 next 指针指向下一个节点&#xff0c;还有一个 random 指针指向链表中的任意节点或者 null。 提示&#xff1a; -10000 < Node.val < 10000 Node.random 为…

windows server 2019 启动 nginx 报错

环境 &#xff1a;windows server 2019 &#xff0c;nginx-1.19.7 背景&#xff1a; 自己经常用这个 nginx 包作为 web 服务器。今天发现 部署到 server 2019 上直接报错了。这可是原生的包&#xff0c;我啥也没改&#xff0c;怎么可能报错。而且之前在 其他服务器用都没问题…

在ASP.NET Core WebAPI 中使用轻量级的方式实现一个支持持久化的缓存组件

前言 在 WebAPI 开发中&#xff0c;缓存是一种常用的优化手段。Redis 是广泛使用的缓存解决方案&#xff0c;但在某些场景下&#xff0c;我们可能不希望引入第三方依赖&#xff0c;而是希望使用轻量级的方式实现一个支持持久化的缓存组件&#xff0c;满足以下需求&#xff1a;…

【区块链】深入理解椭圆曲线密码学(ECC)

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 深入理解椭圆曲线密码学(ECC)1. 概述2. 椭圆曲线的数学基础2.1 基本定义2.2 有限…

内存不足引发C++程序闪退崩溃问题的分析与总结

目录 1、内存不足一般出现在32位程序中 2、内存不足时会导致malloc或new申请内存失败 2.1、malloc申请内存失败&#xff0c;返回NULL 2.2、new申请内存失败&#xff0c;抛出异常 3、内存不足项目实战案例中相关细节与要点说明 3.1、内存不足导致malloc申请内存失败&#…

设计模式之代理模式(模拟mybatis-spring中定义DAO接口,使用代理类方式操作数据库原理实现场景)

前言&#xff1a; 写写CRUD&#xff0c;不会的百度一下&#xff0c;就完事了&#xff0c;总觉得别人问的东西像在造火箭一样。但在高体量、高并发的业务场景下&#xff0c;每一次的压测优化&#xff0c;性能提升&#xff0c;都像在研究一道数学题一样&#xff0c;反复的锤炼&am…

Java项目实战II基于微信小程序的图书馆自习室座位预约平台(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。 一、前言 在知识爆炸的时代&#xff0c;图书馆和…

【机器学习】——卷积与循环的交响曲:神经网络模型在现代科技中的协奏

&#x1f3bc;个人主页&#xff1a;【Y小夜】 &#x1f60e;作者简介&#xff1a;一位双非学校的大二学生&#xff0c;编程爱好者&#xff0c; 专注于基础和实战分享&#xff0c;欢迎私信咨询&#xff01; &#x1f386;入门专栏&#xff1a;&#x1f387;【MySQL&#xff0…

智慧社区管理系统平台提升物业运营效率与用户体验

内容概要 智慧社区管理系统平台是一个集成了多项功能的综合性解决方案&#xff0c;旨在通过先进的技术手段提升物业管理的效率和居民的生活质量。该平台不仅关注物业运营的各个方面&#xff0c;还强调用户体验的重要性。随着科技的发展&#xff0c;社区管理方式正发生着翻天覆…