Spark累加器

1. 累加器

累加器:分布式共享只写变量

考虑如下计算RDD中数据的和:

val rdd = sc.makeRDD(List(1, 2, 3, 4))var sum = 0
rdd.foreach(num => {sum += num}
)println("sum = " + sum)

预期结果10,但其实不是

foreach里面的函数是在Executor节点分布式执行的,所以多个分布式节点来同时修改sum,但是这个sum是Driver传给Executor的,各个Executor执行完并不会将sum返回给Driver,所以Driver端执行打印sum,sum依然为0。如果需要将最终的计算结果返回给Driver,就要使用到累加器变量。

累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的累加器变量,在Executor端会得到这个变量的一个新的副本,执行任务更新完这个副本之后,传回Driver端进行聚合。

使用累加器:

val rdd = sc.makeRDD(List(1, 2, 3, 4))val sum = sc.longAccumulator("sum")rdd.foreach(num => {sum.add(num)
})println("sum = " + sum.value)

如果转换算子中使用了累加器,最终没有调用行动算子,那么实际也不会执行计算,例如下面代码sum依然为0 

val rdd = sc.makeRDD(List(1, 2, 3, 4))val sum = sc.longAccumulator("sum")rdd.map(num => {sum.add(num)num
})println("sum = " + sum.value)

注意,使用累加器最终执行行动算子不止一次,可能最终结果不符合预期

val rdd = sc.makeRDD(List(1, 2, 3, 4))val sum = sc.longAccumulator("sum")val mapRDD = rdd.map(num => {sum.add(num)num
})mapRDD.collect()
mapRDD.collect()
println("sum = " + sum.value)

所以一般累加器需要放在行动算子里进行操作。

使用累加器实现WordCount,以避免Shuffle操作。首先定义一个自定义累加器,以实现map的累加(因为Spark中只有List集合的累加)

// AccumulatorV2泛型:IN:输入类型; OUT:输出类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Long]] {private var wcMap = mutable.Map[String, Long]// 判断是否是初始状态override def isZero : Boolean = {wcMap.isEmpty}override def copy() : AccumulatorV2[String, mutable.Map[String, Long]] = {new MyAccumulator()}override def reset() : Unit = {wcMap.clear()}// 累加函数,每新来一个输入,如何累加override def add(word : String) : Unit = {val newCnt = wcMap.getOrElse(word, 0L) + 1wcMap.update(word, newCnt);}// 合并函数,由Driver执行,这里本质是两个map的合并override def merge(other : AccumulatorV2[String, mutable.Map[String, Long]]) : Unit = {val map1 = wcMapval map2 = othermap2.foreach{case(word, count) => {val newCnt = map1.getOrElse(word, 0L) + countmap1.update(word, newCnt)}}
}override def value : mutable.Map[String, Long] = {wcMap}
}
val rdd = sc.makeRDD(List("hello", "spark", "hello"))val myAcc = sc.newMyAccumulator()
sc.register(myAcc, "wordCountAcc")rdd.foreach(word => {myAcc.add(word)
})println(myAcc.value)

2. 广播变量

累加器:分布式共享只读变量

如果想实现两个rdd的相同key的value连接起来,一般会想到join算子,但是join是笛卡尔乘积,且存在shuffle,严重影响性能。可采用如下代码实现类似功能:

val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))rdd1.map {case(w, c) => {val l : Int = map.getOrElse(w, 0)(w, (c, l))}
}.collect.foreach(println)

上述代码存在一个问题,map算子内的函数是在Executor内执行的(具体来说是Executor里的task执行的),里面用到了map这份数据,如果map很大,那么每个Executor的每个task都会包含这份大数据,非常占用内存,影响性能。于是引入了广播变量的概念,将这种数据存放在每一个Executor的内存中,该Executor中的各个task共享这个只读变量:

val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))val map = mutable.Map(("a", 4), ("b", 5), ("c", 6))val bc : Broadcast[mutable.Map[String, Int]] = sc.broadcast(map)rdd1.map {case(w, c) => {val l : Int = bc.value.getOrElse(w, 0)(w, (c, l))}
}.collect.foreach(println)

3. 案例需求

1)已经从原始日志文件中读出了商品的点击数量rdd、下单数量rdd、支付数量rdd,都是(商品id, cnt)的形式,需要将这三种rdd组合成(商品id, (点击数量, 下单数量, 支付数量))的rdd,并且依次按照点击数量、下单数量、 支付数量排序取前十。

很自然地想到组合rdd的算子join,但是join只能组合相同的key,如果一个商品只有点击没有下单,那么使用join是不会出现在最终结果的,同理leftOuterJoin和rightOuterJoin也是类似的,不能实现相应的功能。因此只有cogroup算子满足要求,cogroup = connect + group。(join和cogroup算子的功能示例参见:RDD算子介绍(三))

val cogroupRDD : RDD[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))]= clickRDD.cogroup(orderRDD, payRDD)val resultRDD = cogroupRDD.mapValues{case(clickIter, orderIter, payIter) => {var clickCnt = 0val iter1 = clickIter.iteratorif (iter1.hasNext) {clickCnt = iter1.next()}var orderCnt = 0val iter2 = orderIter.iteratorif (iter2.hasNext) {orderCnt = iter2.next()}var payCnt = 0val iter3 = payIter.iteratorif (iter3.hasNext) {payCnt = iter1.next()}(clickCnt, orderCnt, payCnt)}
}resultRDD.sortBy(_._2, false).take(10).foreach(println)

 上述实现方式使用了cogroup算子,该算子可能存在shuffle,且该算子不常用,可以采用另外一种方式实现。首先将商品的点击数量rdd、下单数量rdd、支付数量rdd转换为(商品id, (clickCnt, orderCnt, payCnt))的形式,然后进行union,最后进行聚合。

val rdd1 = clickRDD.map {case (id, cnt) => {(id, (cnt, 0, 0)}
}val rdd2 = orderRDD.map {case (id, cnt) => {(id, (0, cnt, 0)}
}val rdd3 = payRDD.map {case (id, cnt) => {(id, (0, 0, cnt)}
}val rdd : RDD[(String, (Int, Int, Int))] = rdd1.union(rdd2).union(rdd3)val resultRDD = rdd.reduceByKey((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2, t3._3 + t3._3)
})resultRDD.sortBy(_._2, false).take(10).foreach(println)

上述代码使用了reduceByKey,还是会有shuffle操作,要完全避免shuffle操作,可以使用foreach算子,如果要使用foreach算子,就必须使用累加器了。这个累加器的作用就是遍历原始数据,按照商品id进行分组,对商品的用户行为类型(点击、下单、支付)对进行数量统计。可以将商品的用户行为(点击、下单、支付)数量封装成一个样例类HotCatagory,然后这个累加器的输入就是商品id+行为类型(点击、下单、支付),输出就是这个样例类的集合。具体实现过程参考https://www.bilibili.com/video/BV11A411L7CK?p=116&spm_id_from=pageDriver&vd_source=23ddeeb1fb342c5293413f7b87367160​​​​​​​

2)统计页面的跳转率。所谓某个页面A到某个页面B的跳转率,就是页面A到页面B的次数/页面A的点击次数。首先统计各个页面的点击次数,数据结构为map,作为分母。对于分子,需要按照sessionId进行分组,然后按照时间排序,这样才能得到各个用户浏览页面的顺序,然后转换数据结构,统计各个页面到其他页面的跳转次数。

actionDataRDD.cache()// 分母
val pageIdToCountMap : Map[Long, Long] = actionDataRDD.map(action => {(action.page_id, 1L)
}).reduceByKey(_+_).collect.toMapval sessionRDD : RDD[(String, Iterable[UserVisitAction])] = actionDataRDD.groupBy(_.session_id)val mvRDD : RDD[(String, List[((Long, Long), Int)])] = sessionRDD.mapValues(iter => {val sortList : List[UserVisitAction] = iter.toList.sortBy(_.action_time)val flowIds : List[Long] = sortList.map(_.page_id)val pageFlowIds : List[(Long, Long)] = flowIds.zip(flowIds.tail)pageFlowIds.map(t=> (t, 1))}
)// 分子
val dataRDD = mvRDD.map(_._2).flatMap(list=>list).reduceByKey(_+_)dataRDD.foreach {case((page1, page2), sum) => {val cnt : Long = pageIdToCountMap.getOrElse(page1, 0L)println(s"页面${page1}跳转页面${page2}的转换率为" + (sum.toDouble / cnt))}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/329936.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

揭秘未来,开启盲盒新篇章——打造你的专属盲盒小程序

一、引言 在这个充满未知与惊喜的时代,盲盒文化已经深入人心,成为年轻人追求新奇、体验刺激的新宠。如今,随着科技的快速发展,盲盒文化也迎来了全新的发展机遇。我们诚挚地邀请您一同踏上这场盲盒小程序开发的旅程,共…

查询一个字符串在另一个字符串中出现的次数(java)

查询一个字符串在另一个字符串中出现的次数 例: String str1“helloworld,java,python,hellokafka,world big table helloteacher”; String str2“hello”; 字符串str2在str1中出现3次 代码 package exercise.test8;public class Demo8 {public static void mai…

VLAN---虚拟局域网

通过在交换机上部署VLAN技术,将一个规模较大的广播域在逻辑上划分成若干个不同的、规模较小的广播域。 IEEE 802.1Q标准----虚拟桥接局域网标准----Dot1Q标准 VLAN-ID:标定该数据帧所属的VLAN ID信息 PC1发送的是一个无标记帧(传统的以太网…

Google Earth Engine(GEE)深度学习入门教程-Python数据读入篇

Python数据读入篇 前置条件: GEE预处理影像导出保存为tfrecord的数据包,并下载到本地tensorflow的深度学习环境 本篇文章的目的主要是把Tfrecord格式的数据加载为tf可使用的数据集格式 设定超参数 首先需要设定导出时的波段名称和数据格式&#xff…

Spring Security实现用户认证三:结合MySql数据库对用户进行认证

Spring Security实现用户认证三:结合MySql数据库对用户进行认证 1 原理2 基于内存的认证(默认方式)2.1 依赖2.2 WebSecurityConfig配置类添加配置 3 为下一步准备数据源3.1 依赖3.2 创建表users和authorities3.3 配置DruidDataSource数据源3.…

KDE-Ambari-Metrics-Collector问题排查解决手册

文档说明 本文档是为了解决KDE平台的Ambari-Metrics-Collector服务在运行时遇到的问题而提供的问题排查和解决方法的参考文档 说明: 当前的Ambari-Metrics-Collector服务包括了ams-collector和ams-hbase两个程序,在Ambari-Metrics-Collector安装的节点执行ps -elf|grep am…

【热门话题】一文带你读懂公司是如何知道张三在脉脉上发了“一句话”的

按理说呢,A公司和脉脉属于不同的平台,而且脉脉上大家可以匿名发言,所以,即便我坐在你边上,我发了一句话上去,你也不知道是谁发的。但通过一些技术,我们却可以分析出,公司是如何知道张…

2024 电工杯高校数学建模竞赛(A题)数学建模完整思路+完整代码全解全析

你是否在寻找数学建模比赛的突破点?数学建模进阶思路! 作为经验丰富的数学建模团队,我们将为你带来2024电工杯数学建模竞赛(B题)的全面解析。这个解决方案包不仅包括完整的代码实现,还有详尽的建模过程和解…

AI网络爬虫:批量爬取电视猫上面的《庆余年》分集剧情

电视猫上面有《庆余年》分集剧情&#xff0c;如何批量爬取下来呢&#xff1f; 先找到每集的链接地址&#xff0c;都在这个class"epipage clear"的div标签里面的li标签下面的a标签里面&#xff1a; <a href"/drama/Yy0wHDA/episode">1</a> 这个…

玩转OpenHarmony PID:教你打造两轮平衡车

简介 此次为大家带来的是OpenAtom OpenHarmony&#xff08;以下简称“OpenHarmony”&#xff09;系统与PID控制算法相结合并落地的平衡车项目。 PID控制算法是一种经典的&#xff0c;并被广泛应用在控制领域的算法。类似于这种&#xff1a;需要将某一个物理量保持稳定的场合&…

增强版 Kimi:AI 驱动的智能创作平台,实现一站式内容生成(图片、PPT、PDF)!

前言 基于扣子 Coze 零代码平台&#xff0c;我们从零到一轻松实现了专属 Bot 机器人的搭建。 AI 大模型&#xff08;LLM&#xff09;、智能体&#xff08;Agent&#xff09;、知识库、向量数据库、知识图谱&#xff0c;RAG&#xff0c;AGI 的不同形态愈发显现&#xff0c;如何…

Redis系统架构中各个处理模块是干什么的?no.19

Redis 系统架构 通过前面的学习&#xff0c;相信你已经掌握了 Redis 的原理、数据类型及访问协议等内容。本课时&#xff0c;我将进一步分析 Redis 的系统架构&#xff0c;重点讲解 Redis 系统架构的事件处理机制、数据管理、功能扩展、系统扩展等内容。 事件处理机制 Redis…

分布式限流总结

1、计数器 java内部可以使用原子计数器AtomicInteger\Semaphore信号量来做简单的限流 // 限流的个数private int maxCount 10;// 指定的时间内private long interval 60;// 原子类计数器private AtomicInteger atomicInteger new AtomicInteger(0);// 起始时间private lon…

【笔记】树(Tree)

一、树的基本概念 1、树的简介 之前我们都是在谈论一对一的线性数据结构&#xff0c;可现实中也有很多一对多的情况需要处理&#xff0c;所以我们就需要一种能实现一对多的数据结构--“树”。 2、树的定义 树&#xff08;Tree&#xff09;是一种非线性的数据结构&#xff0…

百度智能云参与信通院多项边缘计算标准编制,「大模型时代下云边端协同 AI 发展研讨会」成功召开

1 中国信通院联合业界制定、发布多项标准化成果&#xff0c;推动产业发展 大模型开启了 AI 原生时代&#xff0c;云边端协同 AI 构建了「集中式大规模训练」、「边缘分布式协同推理」新范式&#xff0c;有效降低推理时延和成本&#xff0c;提升数据安全和隐私性&#xff0c;也…

基于Vue3 + js-tool-big-box工具库实现3个随机数字的小游戏动画,快来挑战你的非凡手气!

不知你是否和我一样&#xff0c;我曾有一个猜3个随机数字的梦&#xff0c;但通过多次的努力&#xff0c;梦想最终未能实现&#xff0c;而且还波多了我的饭票。所以&#xff0c;我要通过vue3 js-tool-big-box 这个工具库&#xff0c;来实现一个猜3个随机数字的小游戏动画&#…

学习通高分免费刷课实操教程

文章目录 概要整体架构流程详细步骤云上全平台登录步骤小结 概要 我之前提到过一个通过浏览器的三个脚本就可以免费高分刷课的文章&#xff0c;由于不方便拍视频进行实操演示&#xff0c;然后写下了这个实操教程&#xff0c;之前的三个脚本划到文章末尾 整体架构流程 整体大…

一键批量提取TXT文档前N行,高效处理海量文本数据,省时省力新方案!

大量的文本信息充斥着我们的工作与生活。无论是研究资料、项目文档还是市场报告&#xff0c;TXT文本文档都是我们获取和整理信息的重要来源。然而&#xff0c;面对成百上千个TXT文档&#xff0c;如何快速提取所需的关键信息&#xff0c;提高工作效率&#xff0c;成为了许多人头…

我的第一个JAVA程序IDEA版

目录 第一步 新建一个空项目第二步 新建模块第三步 新建包第四步 新建类第五步 新建main方法 第一步 新建一个空项目 第二步 新建模块 第三步 新建包 第四步 新建类 然后在包文件夹下新建类 第五步 新建main方法

Java开发之JDBC

JDBC 介绍JDBC程序&#xff08;Statement&#xff09;相关细节URLResultSet 连接池程序&#xff08;PreparedStatement&#xff09; 本文主要记录一下学习JDBC的一些知识点 介绍JDBC 首先谈谈什么是JDBC。下面放几张图&#xff0c;大致就可以清楚JDBC了。程序&#xff08;Sta…