Spark---转换算子、行动算子、持久化算子

一、转换算子和行动算子

1、Transformations转换算子

1)、概念

Transformations类算子是一类算子(函数)叫做转换算子,如map、flatMap、reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

2)、Transformation类算子

filter :过滤符合条件的记录数,true保留,false过滤掉

map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。

flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。

sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

reduceByKey:将相同的Key根据相应的逻辑进行处理。

sortByKey/sortBy:作用在K,V格式的RDD上,对Key进行升序或者降序排序。

2、Action行动算子

1)、概念:

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

2)、Action类算子

count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。

take(n):返回一个包含数据集前n个元素的集合。

first:first=take(1),返回数据集中的第一个元素。

foreach:循环遍历数据集中的每个元素,运行相应的逻辑。

collect:将计算结果回收到Driver端。

3)、demo:动态统计出现次数最多的单词个数,过滤掉。

  • 一千万条数据量的文件,过滤掉出现次数多的记录,并且其余记录按照出现次数降序排序。

假设有一个records.txt文件

hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark1
hello Spark
hello Spark
hello Spark2
hello Spark
hello Spark
hello Spark
hello Spark3
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark4
hello Spark
hello Spark
hello Spark5
hello Spark
hello Spark

代码处理:

package com.bjsxt.demo;import java.util.Arrays;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;
/*** 动态统计出现次数最多的单词个数,过滤掉。* @author root**/
public class Demo1 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("demo1");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile("./records.txt");JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String,String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String t) throws Exception {return new Tuple2<String, Integer>(t, 1);}});JavaPairRDD<String, Integer> sample = mapToPair.sample(true, 0.5);final List<Tuple2<String, Integer>> take = sample.reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}}).take(1);System.out.println("take--------"+take);JavaPairRDD<String, Integer> result = mapToPair.filter(new Function<Tuple2<String,Integer>, Boolean>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Integer> v1) throws Exception {return !v1._1.equals(take.get(0)._1);}}).reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}});result.foreach(new VoidFunction<Tuple2<String,Integer>>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t);}});jsc.stop();}
}

3、Spark代码流程

1)、创建SparkConf对象

可以设置Application name。

可以设置运行模式。

可以设置Spark application的资源需求。

2)、创建SparkContext对象

3)、基于Spark的上下文创建一个RDD,对RDD进行处理。

4)、应用程序中要有Action类算子来触发Transformation类算子执行。

5)、关闭Spark上下文对象SparkContext。

二、Spark持久化算子

1、控制算子

1)、概念

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

2)、cache

默认将RDD的数据持久化到内存中。cache是懒执行。

注意:chche()=persist()=persist(StorageLevel.Memory_Only)

测试cache文件:

测试代码:

1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("CacheTest");
3.JavaSparkContext jsc = new JavaSparkContext(conf);
4.JavaRDD<String> lines = jsc.textFile("persistData.txt");
5.
6.lines = lines.cache();
7.long startTime = System.currentTimeMillis();
8.long count = lines.count();
9.long endTime = System.currentTimeMillis();
10.System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ 
11.(endTime-startTime));
12.
13.long countStartTime = System.currentTimeMillis();
14.long countrResult = lines.count();
15.long countEndTime = System.currentTimeMillis();
16.System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
17.countStartTime));
18.
19.jsc.stop();

persist:

可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2“表示有副本数。

持久化级别如下:

2、cache和persist的注意事项

1)、cache和persist都是懒执行,必须有一个action类算子触发执行。

2)、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

3)、cache和persist算子后不能立即紧跟action算子。

4)、cache和persist算子持久化的数据当applilcation执行完成之后会被清除。

错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

3、checkpoint

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
  • persist(StorageLevel.DISK_ONLY)与Checkpoint的区别?

1)、checkpoint需要指定额外的目录存储数据,checkpoint数据是由外部的存储系统管理,不是Spark框架管理,当application完成之后,不会被清空。

2)、cache() 和persist() 持久化的数据是由Spark框架管理,当application完成之后,会被清空。

3)、checkpoint多用于保存状态。

  • checkpoint 的执行原理:

1)、当RDD的job执行完毕后,会从finalRDD从后往前回溯。

2)、当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。

3)、Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。

  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
  • 使用:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("checkpoint");
3.JavaSparkContext sc = new JavaSparkContext(conf);
4.sc.setCheckpointDir("./checkpoint");
5.JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
6.parallelize.checkpoint();
7.parallelize.count();
8.sc.stop();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/201736.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PS_魔幻

首先打开一个背景图片 然后ctrl j复制一层背景 在调整中将图片改成黑白颜色 点击调整中的 色相/饱和度 调整明度 点击画笔工具&#xff0c;并且设置画笔模板 调节画笔大小&#xff0c;将笔记本电脑涂个概况 然后再新建色相/饱和度 勾选着色 调节背景颜色至喜欢 右键混合选项 …

LeetCode 2304. 网格中的最小路径代价:DP

【LetMeFly】2304.网格中的最小路径代价&#xff1a;DP 力扣题目链接&#xff1a;https://leetcode.cn/problems/minimum-path-cost-in-a-grid/ 给你一个下标从 0 开始的整数矩阵 grid &#xff0c;矩阵大小为 m x n &#xff0c;由从 0 到 m * n - 1 的不同整数组成。你可以…

OFI libfabric原理及应用解析

Agenda 目录/议题 编译通信软件硬件和软件带来的挑战为什么需要libfabriclibfabric架构API分组socket应用 VS libfabric应用区别GPU数据传输示例 编译通信软件 可靠面向连接的TCP和无连接的数据报UDP协议高性能计算HPC或人工智能AI 软硬件复杂性带来的挑战 上千个节点的集群, …

鸿蒙4.0开发笔记之ArkTs语言基础与基本组件结构(四)

文章声明&#xff1a;本文关于HarmonyOS系统的部分内容和描述借鉴于华为官网的“HarmonyOS开发者学堂”&#xff0c;有需要的也可以进入官网查看。<HarmonyOS第一课>ArkTS开发语言介绍 一、ArkTs语言介绍 ArkTS是鸿蒙系统&#xff08;HarmonyOS&#xff09;优选的主力应…

Linux上通过SSL/TLS和start tls连接到LDAP服务器

一&#xff0c;大致流程。 1.首先在Linux上搭建一个LDAP服务器 2.在LDAP服务器上安装CA证书&#xff0c;服务器证书&#xff0c;因为SSL/TLS&#xff0c;start tls都属于机密通信&#xff0c;需要客户端和服务器都存在一个相同的证书认证双方的身份。3.安装phpldapadmin工具&am…

基于STM32的数字图像处理与模式识别算法优化

基于STM32的数字图像处理与模式识别算法优化是一项涉及图像处理和机器学习领域的研究任务&#xff0c;旨在实现高效的图像处理和模式识别算法在STM32微控制器上的运行。本文将介绍基于STM32的数字图像处理与模式识别算法优化的原理和实现步骤&#xff0c;并提供相应的代码示例。…

【追求卓越01】数据结构--数组

引导 这一章节开始&#xff0c;正式进入数据结构与算法的学习过程中。由简到难&#xff0c;先开始学习最基础的数据结构--数组。 我相信对于数组&#xff0c;大家肯定是不陌生&#xff0c;因为数组在大多数的语言中都有&#xff0c;也是大家在编程中常常会接触到的。我不会说数…

01【SpringBoot快速入门、yml语法、自动配置、整合框架】

目录 一、SpringBoot简介 1.1 Spring优缺点 1.1.1 Spring的优点 1.1.2 Spring的缺点 1.2 SpringBoot的概述 1.2.1 SpringBoot概述 1.2.2 SpringBoot的核心功能 二、SpringBoot快速入门 2.1 创建Maven工程 2.2 添加起步依赖 2.3 编写Controller 2.4 编写SpringBoot引…

c语言——俄罗斯方块

一、游戏效果 俄罗斯方块 二. 游戏背景 俄罗斯方块是久负盛名的游戏&#xff0c;它也和贪吃蛇&#xff0c;扫雷等游戏位列经典游戏的⾏列。 《俄罗斯方块》&#xff08;Tetris&#xff0c;俄文&#xff1a;Тетрис&#xff09;是一款由俄罗斯人阿列克谢帕基特诺夫于1984…

GIT实践与常用命令---回退

实践场景 场景1 回退提交 在日常工作中&#xff0c;我们可能会和多个同事在同一个分支进行开发&#xff0c;有时候我们可能会出现一些错误提交&#xff0c;这些错误提交如果想撤销&#xff0c;可以有两种解决办法:回退( reset )、反做(revert) keywords&#xff1a;reset、rev…

Android Spannable 使用​注意事项

1、当前示例中间的 "评论"&#xff0c;使用SpannableStringBuilder实现&#xff0c;点击评论会有高亮效果加粗&#xff0c;但再点击其它Bar时无法恢复默认样式。 2、因为SpannableString或SpannableStringBuilder中的效果是叠加的&#xff0c;恢复默认样式需要先移除…

创作4周年

&#x1f64c;秋名山码民的主页 &#x1f602;oi退役选手&#xff0c;Java、大数据、单片机、IoT均有所涉猎&#xff0c;热爱技术&#xff0c;技术无罪 &#x1f389;欢迎关注&#x1f50e;点赞&#x1f44d;收藏⭐️留言&#x1f4dd; 获取源码&#xff0c;添加WX 目录 前言机…

【鸿蒙应用ArkTS开发系列】- 云开发入门实战二 实现城市多级联动Demo(上)

目录 概述 云数据库开发 一、创建云数据库的对象类型。 二、预置数据&#xff08;为对象类型添加数据条目&#xff09;。 三、部署云数据库 云函数实现业务逻辑 一、创建云函数 二、云函数目录讲解 三、创建resources目录 四、获取云端凭据 五、导出之前创建的元数据…

Windows + VS2022超详细点云库(PCL1.8.1)配置

本文在结合多位CSDN大佬的步骤&#xff0c;记录以下最全的点云配置过程&#xff0c;防止走弯路&#xff08;并在最后配上PCL环境配置成功的测试代码-彩色兔子&#xff09; 一、PCL介绍 PCL概述_pcl技术_一杯盐水的博客-CSDN博客 二、准备工作&#xff08;PCL版本的下载&…

2023年危险化学品生产单位安全生产管理人员证模拟考试题库及危险化学品生产单位安全生产管理人员理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年危险化学品生产单位安全生产管理人员证模拟考试题库及危险化学品生产单位安全生产管理人员理论考试试题是由安全生产模拟考试一点通提供&#xff0c;危险化学品生产单位安全生产管理人员证模拟考试题库是根据危…

DedeBIZ 管理系统 DedeV6 v6.2.6 社区版 免费授权版

DedeBIZ 系统&#xff1a;开源、安全、高效的 DedeV6 v6.2.6 社区版 DedeBIZ 系统是基于 PHP 7 版本开发的&#xff0c;具有强大的可扩展性&#xff0c;并且完全开放源代码。它采用现流行的 Go 语言设计开发&#xff0c;不仅拥有简单易用、灵活扩展的特性&#xff0c;还具备更…

Golang版本处理Skywalking Trace上报数据

Tips: 中间记录了解决问题的过程&#xff0c;如不感兴趣可直接跳至结尾 首先去es里查询skywalking trace的元数据 可以拿到一串base64加密后的data_binary(直接解密不能用&#xff0c;会有乱码&#xff0c;可参考https://github.com/apache/skywalking/issues/7423) 对data_b…

[点云分割] 条件欧氏聚类分割

介绍 条件欧氏聚类分割是一种基于欧氏距离和条件限制的点云分割方法。它通过计算点云中点与点之间的欧氏距离&#xff0c;并结合一定的条件限制来将点云分割成不同的区域或聚类。 在条件欧氏聚类分割中&#xff0c;通常会定义以下两个条件来判断两个点是否属于同一个聚类&…

php文件上传例子

目录结构&#xff1a; index.html代码&#xff1a; <!DOCTYPE html> <html><head><title>文件上传</title><meta charset"utf-8"></head><body><form action"./up.php" method"post" encty…

CentOS 7 使用pugixml 库

安装 pugixml Git下载地址&#xff1a;https://github.com/zeux/pugixml 步骤1&#xff1a;首先&#xff0c;你需要下载pugixml 的源代码。你可以从Github或者源代码官方网站下载。并上传至/usr/local/source_code/ 步骤2&#xff1a;下载完成后&#xff0c;需要将源代码解压…