大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

1、broadcast广播

在这里插入图片描述

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

2、构建缓存

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject// 定义全局缓存单例对象
object GlobalCache extends Serializable {// 广播变量,用于存储缓存数据private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _// 设置 SparkSession 和广播变量def setSparkSession(spark: SparkSession): Unit = {cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])}// 按订单ID和用户ID缓存JSONObject对象def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.put(generateKey(orderId, userId), jsonObject)}}// 根据订单ID和用户ID删除缓存的JSONObject对象def removeJSONObject(orderId: String, userId: String): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.remove(generateKey(orderId, userId))}}// 根据订单ID和用户ID获取缓存的JSONObject对象def getJSONObjet(orderId: String, userId: String): JSONObject = {// 获取广播变量的值并进行访问val data = cacheData.valuedata.synchronized {data.get(generateKey(orderId, userId)).orNull}}// 生成缓存键,使用订单ID和用户ID拼接private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}

3、缓存测试

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}object CacheTest {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别def addItem(orderId:String, userId:String, name:String): Unit = {val jsonObject = new JSONObject()jsonObject.put("name", name)// 缓存JSONObject对象GlobalCache.cacheJSONObject(orderId, userId, jsonObject)}def getCache(orderId: String, userId: String): JSONObject = {// 获取缓存的JSONObject对象GlobalCache.getJSONObjet(orderId, userId)}def delItem(orderId:String, userId:String): Unit = {// 删除缓存的JSONObject对象GlobalCache.removeJSONObject(orderId, userId)}def getSparkSession(appName: String, localType: Int): SparkSession = {val builder: SparkSession.Builder = SparkSession.builder().appName(appName)if (localType == 1) {builder.master("local[8]") // 本地模式,启用8个核心}val spark = builder.getOrCreate() // 获取或创建一个新的SparkSessionspark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别spark}def main(args: Array[String]): Unit = {println("Start CacheTest")val spark: SparkSession = getSparkSession("CacheTest", 1)GlobalCache.setSparkSession(spark)  // 构造全局缓存addItem("001", "456", "苹果")      // 添加元素addItem("002", "789", "香蕉")      // 添加元素var cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")delItem("001", "456")      // 删除元素cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")spark.stop()}
}

4、控制台输出

Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: nullProcess finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/73306.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

25.9 matlab里面的10中优化方法介绍—— 惩罚函数法求约束最优化问题(matlab程序)

1.简述 一、算法原理 1、问题引入 之前我们了解过的算法大部分都是无约束优化问题,其算法有:黄金分割法,牛顿法,拟牛顿法,共轭梯度法,单纯性法等。但在实际工程问题中,大多数优化问题都属于有约…

华为云CodeArts产品体验的心得体会及想法

文章目录 前言CodeArts 的产品优势一站式软件开发生产线研发安全Built-In华为多年研发实践能力及规范外溢高质高效敏捷交付 功能特性说明体验感受问题描述完结 前言 华为云作为一家全球领先的云计算服务提供商,致力于为企业和个人用户提供高效、安全、可靠的云服务。…

网络编程、网络编程的三要素、TCP/UDP通信、三次握手和四次挥手

🐌个人主页: 🐌 叶落闲庭 💨我的专栏:💨 c语言 数据结构 javaweb 石可破也,而不可夺坚;丹可磨也,而不可夺赤。 网络编程 一、初始网络编程1.1什么是网络编程1.2BS/CS的优…

OpenMMLab【超级视客营】——把类别信息加入可视化结果中(MMSegmentation的第二个PR)

文章目录 1. 任务说明1.0 新手指引1.1 任务目标1.2 提交格式 2. 实施2.1 可视化的形式2.2 拉分支和提交PR2.2.1 拉分支2.2.2 提交PR 2.3 MMSegmentation中关于可视化的内容2.3.1 文档说明2.3.2 相关PR(确定要修改的文件)2.3.3 提交时的代码测试 2.4 发现…

item_get-小红薯-商品详情

一、接口参数说明: item_get-获得小红薯商品详情,点击更多API调试,请移步注册API账号点击获取测试key和secret 公共参数 名称类型必须描述keyString是调用key(http://o0b.cn/iimiya)secretString是调用密钥api_nameS…

中国农村程序员学习此【ES6】购买大平层,开上帕拉梅拉,迎娶白富美出任CEO走上人生巅峰

注:最后有面试挑战,看看自己掌握了吗 文章目录 比较 var 和 let 关键字的作用域--var可能被随时覆盖-全局变量for循环全局作用域函数作用域块作用域循环作用域HTML 中的全局变量提升改变一个用 const 声明的数组防止对象改变使用箭头函数编写简洁的匿名函…

CPU密集型和IO密集型任务的权衡:如何找到最佳平衡点

关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、人工智能等,希望大家多多支持。 目录 一、导读二、概览三、CPU密集型与IO密集型3.1、CPU密集型3.2、I/O密…

list遍历添加条件同时修改元素

list遍历添加条件同时修改元素 方式一 // 遍历list同时修改元素List<Person> list new ArrayList(16);list.add(new Person("小王", 18));list.add(new Person("小三", 17));list.stream().filter(item -> item.getAge() > 17).forEach(item…

Linux系统使用(超详细)

目录 Linux操作系统简介 Linux和windows区别 Linux常见命令 Linux目录结构 Linux命令提示符 常用命令 ls cd pwd touch cat echo mkdir rm cp mv vim vim的基本使用 grep netstat Linux面试题 Linux操作系统简介 Linux操作系统是和windows操作系统是并列…

docker 安装 字体文件

先说一下我当前的 场景 及 环境&#xff0c;这样同学们可以先评估本篇文章是否有帮助。 环境&#xff1a; dockerphp8.1-fpmwindows 之所以有 php&#xff0c;是因为这个功能是使用 php 开发的&#xff0c;其他语言的同学&#xff0c;如果也有使用到 字体文件&#xff0c;那么…

uniapp echarts 点击失效

这个问题网上搜了一堆&#xff0c;有的让你降版本&#xff0c;有的让你改源码。。。都不太符合预期&#xff0c;目前我的方法可以用最新的echarts。 这个方法就是由npm安装转为CDN&#xff0c;当然你可能会质疑用CDN这样会不稳定&#xff0c;那如果CDN的地址是本地呢&#xff1…

Stable Diffusion - Stable Diffusion WebUI 支持 SDXL 1.0 模型的环境配置

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/132056980 SDXL 1.0 版本 是 Stable Diffusion 的最新版本&#xff0c;是基于潜在扩散模型的文本到图像生成技术&#xff0c;能够根据输入的任何文…

uniapp小程序自定义loding,通过状态管理配置全局使用

一、在项目中创建loding组件 在uniapp的components文件夹下创建loding组件&#xff0c;如图&#xff1a; 示例代码&#xff1a; <template><view class"loginLoading"><image src"../../static/loading.gif" class"loading-img&q…

《ChatGPT原理最佳解释,从根上理解ChatGPT》

【热点】 2022年11月30日&#xff0c;OpenAI发布ChatGPT&#xff08;全名&#xff1a;Chat Generative Pre-trained Transformer&#xff09;&#xff0c; 即聊天机器人程序 &#xff0c;开启AIGC的研究热潮。 ChatGPT是人工智能技术驱动的自然语言处理工具&#xff0c;它能够…

make/makefile的使用

make/makefile 文章目录 make/makefile初步认识makefile的工作流程依赖关系和依赖方法make的使用 总结 make是一个命令&#xff0c;是一个解释makefile中指令的命令工具&#xff0c;makefile是一个文件&#xff0c;当前目录下的文件&#xff0c;两者搭配使用&#xff0c;完成项…

6.物联网操作系统信号量

一。信号量的概念与应用 信号量定义 FreeRTOS信号量介绍 FreeRTOS信号量工作原理 1.信号量的定义 多任务环境下使用&#xff0c;用来协调多个任务正确合理使用临界资源。 2.FreeRTOS信号量介绍 Semaphore包括Binary&#xff0c;Count&#xff0c;Mutex&#xff1b; Mutex包…

【Spring Boot】单元测试

单元测试 单元测试在日常项目开发中必不可少&#xff0c;Spring Boot提供了完善的单元测试框架和工具用于测试开发的应用。接下来介绍Spring Boot为单元测试提供了哪些支持&#xff0c;以及如何在Spring Boot项目中进行单元测试。 1.Spring Boot集成单元测试 单元测试主要用…

【Python小笔记】零碎同步

1.多字段连接&#xff0c;连接字段名不一致–left_on\right_on对应列示后可匹配 import pandas as pd df_A1pd.read_excel(E:\Mercy\data\mytest\A.xlsx,sheet_name0) df_A2pd.read_excel(E:\Mercy\data\mytest\A.xlsx,sheet_name1)df_Adf_A1.merge(rightdf_A2,howleft,left_o…

spring-cache框架使用笔记

spring-cache框架使用笔记 什么是spring-cache框架 spring-cache是spring框架中的一个缓存抽象层&#xff0c; 它提供了一种简便的方式来集成不同的底层缓存实现&#xff0c; 如内存缓存(concurrentMap/ehcache/caffeine)/分布式缓存(redis/couchbase)等 它简化了在app中使用…