Python学习从0到1 day26 第三阶段 Spark ④ 数据输出

半山腰太挤了,你该去山顶看看

                                        —— 24.11.10

一、输出为python对象

1.collect算子

功能:

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

语法:

rdd.collect()

返回值是一个list列表

示例:

from pyspark import SparkConf,SparkContext
import osconf = SparkConf().setMaster("local").setAppName("test_spark")
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
sc = SparkContext(conf = conf)Set = {"小明","小红","小强"}
Tuple = ("小明","小红","小强")set_rdd = sc.parallelize(Set)
tuple_rdd = sc.parallelize(Tuple)print(set_rdd.collect())
print(tuple_rdd.collect())


2.reduce算子

功能:

对RDD数据集按照你传入的逻辑进行聚合

语法:

rdd.reduce(func)rdd = sc.parallelize(range(1 , 10))
# 将rdd的数据进行累加求和
print(rdd.reduce(lambda a , b : a + b))

返回值等同于计算函数的返回值

示例:

from pyspark import SparkContext,SparkConf
import os
import jsonos.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"conf = SparkConf().setMaster("local").setAppName("test_spark")
sc = SparkContext(conf = conf)List = [1,2,3,4,5,6,7,8,9]
rdd = sc.parallelize(List)
print(rdd.reduce(lambda x, y : x + y))


3.take算子

功能:

取RDD的前N个元素,组合成list返回

语法:

sc.parallelize([3,2,1,4,5,6]).take(5)    # [3,2,1,4,5]

 返回前n个元素组成的list

示例:

from pyspark import SparkContext,SparkConf
import os
import jsonos.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
List = (1,2,3,4,5,6,7,8,9)
rdd = sc.parallelize(List)
res = rdd.take(4)
print("前四个元素为:"+res)


4.count算子

功能:

计算RDD有多少条数据

语法:

sc.parallelize([3,2,1,4,5,6]).count()

返回值是一个数字

示例:

from pyspark import SparkConf,SparkContext
import os
import jsonos.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)rdd = sc.parallelize(["yyh","hl","grq","zxj","cby","wfe","mrr","qjy"])
print(rdd.count())


二、输出到文件中

1.saveAsTextFile算子

功能:

RDD的数据写入文本文件

支持本地写出、 hdfs等文件系统

语法:

rdd = sc.parallelize([1,2,3,4,5])
rdd.saveAsTextFile("../data/output/test.txt")

2.配置Hadoop相关依赖

调用保存文件的算子,需要配置Hadoop依赖

① 下载Hadoop安装包

http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz

② 解压到电脑任意位置

③ 在Python代码中使用os模块配置:

os.environ['HADOOP HOME']='HADOOP解压文件夹路径'
E:\python.learning\hadoop分布式相关\hadoop-3.0.0

④ 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内

https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe

⑤ 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内

https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll


3.代码示例

from pyspark import SparkConf,SparkContext
import osconf = SparkConf().setMaster("local").setAppName("test_spark")
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
sc = SparkContext(conf = conf)# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])# 准备RDD2
rdd2 = sc.parallelize([("Hello, 3"),("Spark", 5),("Hi", 7)])# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])# 输出到文件中
rdd1.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output1/rdd1")
rdd2.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output2/rdd2")
rdd3.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output3/rdd3")

注:如果输出路径的文件存在,代码将会报错


4.运行结果

创建几个文件取决于Hadoop上的分区数量

解决方式:修改rdd的分区


5.修改rdd分区为1个

方式1

Sparkconf对象设置属性全局并行度为1:

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "E:/python.learning/pyt/scripts/python.exe"
os.environ['HADOOP_HOME'] = "E:\python.learning\hadoop分布式相关\hadoop-3.0.0"
conf = SparkConf().setMaster("local").setAppName("test_spark")
conf.set("spark.default.parallelize", "1")
sc = SparkContext(conf = conf)# 准备RDD1
rdd1 = sc.parallelize([1,2,3,4,5])# 准备RDD2
rdd2 = sc.parallelize([("Hello, 3"),("Spark", 5),("Hi", 7)])# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5],[6, 7, 9],[11, 13, 11]])# 输出到文件中
rdd1.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output1/rdd1")
rdd2.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output2/rdd2")
rdd3.saveAsTextFile("E:\python.learning\hadoop分布式相关\data\output3/rdd3")

方式2

创建RDD的时候设置 parallelize方法传入numSlices参数为1:

rdd1 = sc.parallelize([1,2,3,4,5],1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/472553.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习】机器学习中用到的高等数学知识-1.线性代数 (Linear Algebra)

向量(Vector)和矩阵(Matrix):用于表示数据集(Dataset)和特征(Feature)。矩阵运算:加法、乘法和逆矩阵(Inverse Matrix)等,用于计算模型参数。特征值(Eigenvalues)和特征向量(Eigenvectors)&…

java项目-jenkins任务的创建和执行

参考内容: jenkins的安装部署以及全局配置 1.编译任务的general 2.源码管理 3.构建里编译打包然后copy复制jar包到运行服务器的路径 clean install -DskipTests -Pdev 中的-Pdev这个参数用于激活 Maven 项目中的特定构建配置(Profile) 在 pom.xml 文件…

【数据库取证】快速从服务器镜像文件中获取后台隐藏数据

文章关键词:电子数据取证、数据库取证、电子物证、云取证、手机取证、计算机取证、服务器取证 小编最近做了很多鉴定案件和参加相关电子数据取证比武赛,经常涉及到服务器数据库分析。现在分享一下技术方案,供各位在工作中和取证赛事中取得好成…

__VUE_PROD_HYDRATION_MISMATCH_DETAILS__ is not explicitly defined

VUE_PROD_HYDRATION_MISMATCH_DETAILS 未明确定义。您正在运行 Vue 的 esm-bundler 构建,它期望这些编译时功能标志通过捆绑器配置全局注入,以便在生产捆绑包中获得更好的tree-shaking优化。 Vue.js应用程序正在使用ESM(ECMAScript模块&#…

git撤销、回退某个commit的修改

文章目录 撤销某个特定的commit方法 1:使用 git revert方法 2:使用 git rebase -i方法 3:使用 git reset 撤销某个特定的commit 如果你要撤销某个很早之前的 commit,比如 7461f745cfd58496554bd672d52efa8b1ccf0b42,可…

Flume和kafka的整合

1、Kafka作为Source 【数据进入到kafka中,抽取出来】 在flume的conf文件夹下,有一个flumeconf 文件夹:这个文件夹是自己创建的 创建一个flume脚本文件: kafka-memory-logger.conf Flume 1.9用户手册中文版 — 可能是目前翻译最完…

JavaSE常用API-日期(计算两个日期时间差-高考倒计时)

计算两个日期时间差(高考倒计时) JDK8之前日期、时间 Date SimpleDateFormat Calender JDK8开始日期、时间 LocalDate/LocalTime/LocalDateTime ZoneId/ZoneDateTIme Instant-时间毫秒值 DateTimeFormatter Duration/Period

支持向量机SVM——基于分类问题的监督学习算法

支持向量机(SVM,Support Vector Machine)是一种常用于分类问题的监督学习算法,其核心思想是通过寻找一个最佳的超平面来将不同类别的数据点分开,从而实现分类。支持向量机广泛应用于模式识别、文本分类、图像识别等任务…

node对接ChatGpt的流式输出的配置

node对接ChatGpt的流式输出的配置 首先看一下效果 将数据用流的方式返回给客户端,这种技术需求在传统的管理项目中不多见,但是在媒体或者有实时消息等功能上就会用到,这个知识点对于前端还是很重要的。 即时你不写服务端,但是服务端如果给你这样的接口,你也得知道怎么去使用联…

SobarQube实现PDF报告导出

文章目录 前言一、插件配置二、使用步骤1.新生成一个Token2.将拷贝的Token加到上文中执行的命令中3.查看报告 三、友情提示总结 前言 这篇博文是承接此文 .Net项目在Windows中使用sonarqube进行代码质量扫描的详细操作配置 描述如何导出PDF报告 众所周知,导出PDF功…

[Codesys]常用功能块应用分享-BMOV功能块功能介绍及其使用实例说明

官方说明 功能说明 参数 类型 功能 pbyDataSrcPOINTER TO BYTE指向源数组指针uiSizeUINT要移动数据的BYTE数pbyDataDesPOINTER TO BYTE指向目标数组指针 实例应用-ST IF SYSTEM_CLOCK.AlwaysTrue THENCASE iAutoState OF0: //读写完成信号在下次读写信号的上升沿或复位信号…

sql注入之二次注入(sqlilabs-less24)

二阶注入(Second-Order Injection)是一种特殊的 SQL 注入攻击,通常发生在用户输入的数据首先被存储在数据库中,然后在后续的操作中被使用时,触发了注入漏洞。与传统的 SQL 注入(直接注入)不同&a…

springboot实现简单的数据查询接口(无实体类)

目录 前言:springboot整体架构 1、ZjGxbMapper.xml 2、ZjGxbMapper.java 3、ZjGxbService.java 4、ZjGxbController.java 5、调用接口测试数据是否正确 6、打包放到服务器即可 前言:springboot整体架构 文件架构,主要编写框选的这几类…

我的第一个PyQt5程序

PyQt5的开发环境配置完成之后,开始编写第一个PyQt5的程序。 方法一:使用将.ui转换成.py文件的方法 import sys from FirstPyQt import Ui_MainWindow from PyQt5.QtWidgets import *#QtCore,QtGui,QtWidgets # from QtTest import Ui_MainWindow#导入Q…

C++ | Leetcode C++题解之第560题和为K的子数组

题目&#xff1a; 题解&#xff1a; class Solution { public:int subarraySum(vector<int>& nums, int k) {unordered_map<int, int> mp;mp[0] 1;int count 0, pre 0;for (auto& x:nums) {pre x;if (mp.find(pre - k) ! mp.end()) {count mp[pre - …

DVWA靶场通关——SQL Injection篇

一&#xff0c;Low难度下unionget字符串select注入 1&#xff0c;首先手工注入判断是否存在SQL注入漏洞&#xff0c;输入1 这是正常回显的结果&#xff0c;再键入1 You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for…

MYSQL 精通索引【快速理解】

目录 1、什么是索引&#xff1f; 2、索引结构 1.为什么不使用二叉树呢&#xff1f; 2.B树数据结果 3.B树 4.Hash结构 3、索引语法 1.创建索引 2.查看索引 3.删除索引 4、SQL性能分析 1.SQL执行频次 2.慢查询日志 3.profile详情 4.EXPLAIN 5、索引规则 1.最左前缀法则 2.索…

【Framework系列】UnityEditor调用外部程序详解

需求介绍 之前Framework系列有介绍过导表配置工具&#xff0c;感兴趣的小伙伴可以看一看之前的文章《【Framework系列】Excel转Json&#xff0c;配置表、导表工具介绍》。由于导表工具和Unity是两个工程&#xff0c;导表工具不在Unity工程之内&#xff0c;所以在配置生成完成之…

Docker+Django项目部署-从Linux+Windows实战

一、概述 1. 什么是Docker Docker 是一个开源的应用容器引擎&#xff0c;支持在win、mac、Linux系统上进行安装。可以帮助我们在一台电脑上创建出多个隔离的环境&#xff0c;比传统的虚拟机极大的节省资源 。 为什么要创建隔离的环境&#xff1f; 假设你先在有一个centos7.…

[GXYCTF2019]BabyUpload--详细解析

信息搜集 进入界面&#xff0c;直接就是文件上传界面&#xff0c;结合题目&#xff0c;得知考察的是文件上传漏洞。 思路 文件上传漏洞&#xff0c;第一步先看有没有前端校验&#xff1a; 没有前端校验。 我们写一个一句话木马文件&#xff1a; //shell.php GIF89a <…