Spark累加器(Accumulator)

1.累加器类型:

  • 数值累加器:用于计算总和、计数等。
  • 布尔累加器:用于计算满足特定条件的次数。
  • 自定义累加器:允许定义复杂的聚合逻辑和数据结构。
  • 集合累加器:用于计算唯一元素的数量,处理去重操作。

    在 Spark 中,累加器(Accumulators)是一种可以用来在任务执行过程中进行累积的变量。它们主要用于计算全局的汇总值,如计数或求和。累加器是只加的变量(即只进行累加操作),并且是分布式的,适合于在多节点环境中进行汇总。

2.示例:

2.1(数值累加器):假设我们有一个包含整数的 RDD,我们希望计算这些整数的总和,并使用累加器来进行累积。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------File Name:     1.测试累加器date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext# 初始化 SparkContext
sc = SparkContext("local[*]", "测试累加器")
# 创建累加器
accumulator = sc.accumulator(0)# 定义一个函数来增加累加器的值
def add_to_accumulator(x):global accumulatoraccumulator.add(x)# 创建一个 RDD
rdd = sc.parallelize([1, 2, 3, 4])# 使用 map 来应用函数,并累加值
rdd.foreach(lambda x: add_to_accumulator(x))# 由于累加器的值在行动操作之后才会被更新,所以需要使用行动操作触发计算
rdd.count()  # 触发计算# 打印累加器的值
print("Accumulated value:", accumulator.value)

2.2(自定义累加器):自定义累加器允许你定义自己的累加逻辑和数据结构。这些累加器可以包含复杂的聚合操作和自定义数据结构。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------File Name:     4.自定义累加器测试date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParamsc = SparkContext("local[*]", "自定义累加器测试")# 自定义累加器类
class ListAccumulatorParam(AccumulatorParam):def zero(self, value):return []def addInPlace(self, acc1, acc2):return acc1 + acc2list_accumulator = sc.accumulator([], ListAccumulatorParam())def add_to_list_accumulator(x):global list_accumulatorlist_accumulator.add([x])return xrdd = sc.parallelize([1, 2, 3, 4])
rdd.foreach(lambda x: add_to_list_accumulator(x))rdd.count()  # 触发计算print("Accumulated list:", list_accumulator.value)

解释
  • 自定义累加器类ListAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空列表,addInPlace 方法合并两个列表。
  • 创建自定义累加器list_accumulator = sc.accumulator([], ListAccumulatorParam()) 创建了一个自定义的累加器实例。
  • 更新累加器add_to_list_accumulator(x) 函数将每个元素作为列表加到累加器中。
  • 应用函数rdd.foreach(lambda x: add_to_list_accumulator(x))add_to_list_accumulator 函数应用到 RDD 的每个元素。
  • 触发计算rdd.count() 触发了 RDD 的计算,更新累加器的值。
  • 查看结果list_accumulator.value 获取累加器的最终值,即累加的列表。
  • RDD 中的每个元素 [1, 2, 3, 4] 被转换为单元素列表 [1], [2], [3], [4],并分别添加到累加器中。
  • 累加器的 addInPlace 方法将这些列表合并成一个完整的列表。
2.3(集合累加器):集合累加器用于跟踪独特的元素集合,例如计算唯一元素的数量。它可以用于去重操作。
# -*- coding: utf-8 -*-
"""
-------------------------------------------------File Name:     3.集合累加器测试date:          2024/7/30
-------------------------------------------------
PRODUCT:PyCharm
-------------------------------------------------
"""
from pyspark import SparkContext
from pyspark.accumulators import AccumulatorParamsc = SparkContext("local[*]", "集合累加器测试")# 自定义集合累加器类
class SetAccumulatorParam(AccumulatorParam):def zero(self, value):return set()def addInPlace(self, acc1, acc2):return acc1.union(acc2)set_accumulator = sc.accumulator(set(), SetAccumulatorParam())def add_to_set_accumulator(x):global set_accumulatorset_accumulator.add(set([x]))return xrdd = sc.parallelize([1, 2, 2, 3, 4, 4])
rdd.foreach(lambda x: add_to_set_accumulator(x))rdd.count()  # 触发计算print("Unique elements:", len(set_accumulator.value))
解释
  • 自定义累加器类SetAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空集合,addInPlace 方法合并两个集合。
  • 创建自定义累加器set_accumulator = sc.accumulator(set(), SetAccumulatorParam()) 创建了一个自定义的累加器实例。
  • 更新累加器add_to_set_accumulator(x) 函数将每个元素作为集合添加到累加器中。
  • 应用函数rdd.foreach(lambda x: add_to_set_accumulator(x))add_to_set_accumulator 函数应用到 RDD 的每个元素。
  • 触发计算rdd.count() 触发了 RDD 的计算,更新累加器的值。
  • 查看结果len(set_accumulator.value) 获取累加器的最终值,即唯一元素的数量。
  • RDD 中的元素 [1, 2, 2, 3, 4, 4] 被转换为集合形式,分别是 {1}, {2}, {2}, {3}, {4}, {4}
  • 每个元素的集合被添加到累加器中。由于累加器的合并逻辑是集合的并集,最终的累加器会包含所有唯一的元素,所以最后的计算结果是4。

3.累加器的特点:

  • 只加操作:累加器只能执行加操作,不能进行减操作或其他类型的操作。
  • 分布式支持:累加器在多节点环境下是分布式的,每个 Executor 都会在其本地更新累加器的值。最后,这些本地值会在 Driver 节点上进行合并。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/387700.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

速看!2024年5月软考通过率解析

根据湖南省工业和信息化厅最新发布的《2024年上半年软考湖南考区工作总结报告》及《考试安全顺利完成的通报》,我们了解到湖南地区在2024年上半年度的软件与信息技术专业人才考试(简称“软考”)中,报名人数达到了13,762人&#xf…

Kafka知识总结(事务+数据存储+请求模型+常见场景)

文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 文章收录在网站:http://hardyfish.top/ 事务 事务Producer保证消息写入分区的原子性,即这批消…

从零到一:用Go语言构建你的第一个Web服务

使用Go语言从零开始搭建一个Web服务,包括环境搭建、路由处理、中间件使用、JSON和表单数据处理等关键步骤,提供丰富的代码示例。 关注TechLead,复旦博士,分享云服务领域全维度开发技术。拥有10年互联网服务架构、AI产品研发经验、…

【HadoopShuffle原理剖析】基础篇二

Shuffle原理剖析 Shuffle,是指对Map输出结果进行分区、排序、合并等处理并交给Reduce的过程。分为Map端的操作和Reduce端的操作。 Shuffle过程 Map端的Shuffle Map的输出结果首先被缓存到内存,当缓存区容量到达80%(缓冲区默认100MB&#xff…

通过进程协作显示图像-C#

前言 如果一个软件比较复杂或者某些情况下需要拆解,可以考试将软件分解成两个或多个进程,但常规的消息传递又不能完全够用,使用消息共享内存,实现图像传递,当然性能这个方面我并没有测试,仅是一种解决思路…

Tekion 选择 ClickHouse Cloud 提升应用性能和指标监控

本文字数:4187;估计阅读时间:11 分钟 作者:ClickHouse team 本文在公众号【ClickHouseInc】首发 Tekion 由前 Tesla CIO Jay Vijayan 于 2016 年创立,利用大数据、人工智能和物联网等技术,为其汽车客户解决…

如何通过 CloudCanal 实现从 Kafka 到 AutoMQ 的数据迁移

01 引言 随着大数据技术的飞速发展,Apache Kafka 作为一种高吞吐量、低延迟的分布式消息系统,已经成为企业实时数据处理的核心组件。然而,随着业务的扩展和技术的发展,企业面临着不断增加的存储成本和运维复杂性问题。为了更好地…

【数据中台】大数据管理平台建设方案(原件资料)

建设大数据管理中台,按照统一的数据规范和标准体系,构建统一数据采集﹣治理﹣共享标准、统一技术开发体系、统一接口 API ,实现数据采集、平台治理,业务应用三层解耦,并按照统一标准格式提供高效的…

electron安装及快速创建

electron安装及快速创建 electron是一个使用 JavaScript、HTML 和 CSS 构建桌面应用程序的框架。 详细内容见官网:https://www.electronjs.org/zh/docs/latest/。 今天来记录下练习中的安装过程和hello world的创建。 创建项目文件夹,并执行npm 初始化命…

ubuntu安装tar安装 nginx最新版本

一、需要先安装依赖 apt install gcc libpcre3 libpcre3-dev zlib1g zlib1g-dev openssl libssl-dev 二、上传安装包 并解压 下载地址 nginx news tar xvf nginx-1.25.2.tar.gz 进入nginx cd nginx-1.25.2 三、编译 ./configure --prefix=/usr/local/nginx --with-htt…

Dolphinscheduler 3.2.1bug记录

问题1:分页只展示首页 解决方案: [Bug][API] list paging missing totalpage by Gallardot Pull Request #15619 apache/dolphinscheduler GitHub 问题2:Hive 数据源连接失败 解决方案:修改源码:HiveDataSourceProcessor.cla…

《深度RAG系列》 LLM 为什么选择了RAG

2023年是AIGC(Artificial Intelligence Generated Content)元年,这一年见证了人工智能生成内容领域的巨大飞跃,特别是大模型的爆发,它们在自然语言处理、图像生成、音频处理等多个领域展现出了惊人的能力。 这些预训练…

数据结构和算法入门

1.了解数据结构和算法 1.1 二分查找 二分查找(Binary Search)是一种在有序数组中查找特定元素的搜索算法。它的基本思想是将数组分成两半,然后比较目标值与中间元素的大小关系,从而确定应该在左半部分还是右半部分继续查找。这个…

花8000元去培训机构学习网络安全值得吗,学成后就业前景如何?

我就是从培训机构学的网络安全,线下五六个月,当时学费不到一万,目前已成功入行。所以,只要你下决心要入这一行,过程中能好好学,那这8000就花得值~ 因为只要学得好,工作两个多月就能赚回学费&am…

浅谈取样器之OS进程取样器

浅谈取样器之OS进程取样器 JMeter 的 OS 进程取样器(OSProcess Sampler)允许用户在 JMeter 测试计划中直接执行操作系统命令或脚本。这一功能对于需要集成系统级操作到性能测试场景中尤为有用,比如运行数据库备份脚本、调用系统维护命令或执…

存储引擎MyISAM和InnoDB

存储引擎:创建、查询、更新、删除 innoDB:64T、支持事物、不支持全文索引、支持缓存、支持外键、行级锁定 MyISAM:256T、不支持事物、支持全文索引、插入和查询速度快 memory:内存、不支持事物、不支持全文索引,临时…

不得不安利的程序员开发神器,太赞了!!

作为一名程序员,你是否常常为繁琐的后端服务而感到头疼?是否希望有一种工具可以帮你简化开发流程,让你专注于创意和功能开发?今天,我要向大家隆重推荐一款绝佳的开发神器——MemFire Cloud。它专为懒人开发者准备&…

KVM高级功能部署

KVM(Kernel-based Virtual Machine)是一个在Linux内核中实现的全虚拟化解决方案。除了基本的虚拟化功能外,KVM还提供了许多高级功能,以增强其性能、安全性和灵活性。以下是一些KVM的高级功能: 硬件加速: In…

基于Deap遗传算法在全量可转债上做因子挖掘(附python代码及全量因子数据)

原创文章第604篇,专注“AI量化投资、世界运行的规律、个人成长与财富自由"。 在4.x的时候,咱们分享过deap遗传算法挖掘因子的代码和数据,今天我们来升级到5.x中。 源码发布Quantlab4.2,Deap因子挖掘|gplearn做不到的咱们也…

全新微软语音合成网页版源码,短视频影视解说配音网页版系统-仿真人语音

源码介绍 最新微软语音合成网页版源码,可以用来给影视解说和短视频配音。它是TTS文本转语言,API接口和PHP源码。 这个微软语音合成接口的源码,超级简单,就几个文件搞定。用的是官方的API,试过了,合成速度…