spark的使用

spark的使用

spark是一款分布式的计算框架,用于调度成百上千的服务器集群。

安装pyspark

# os.environ['PYSPARK_PYTHON']='解析器路径' pyspark_python配置解析器路径
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
pip install pyspark # 原始国外安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark  #网址安装

java安装

前置安装软件java包
java官网下载地址
一键下一步安装,配置环境变量
首先创建一个JAVA_HOME的全局变量然后在path中通过%%引入执行下面的bin 路径%JAVA_HOME%\bin

在这里插入图片描述
在这里插入图片描述
执行成功

from pyspark import SparkConf,SparkContext# 创建sparkConf 类对象
conf= SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc =SparkContext(conf=conf)
# 打印pySpark的运行脚本
print(sc.version)
# 停止sparkContext对象的运行(停止pySpark程序)
sc.stop()

PySpark的数据计算,都是基于RDD对象来进行的,RDD对象内置丰富的:成员方法(算子)

map算子

功能:map算子,是将RDD的数据一条条处理,处理的逻辑基于map算子中接收的处理函数,返回新的RDD语法:
在这里插入图片描述

# 简单执行map将数据乘以10返回,如果不引入python解析器的路径引入就会报错,
from pyspark import SparkConf, SparkContext
# 指定spark的python解析器路径
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])def func(data):return data * 10# map传入一个参数有返回值,是函数或者是值
rdd2 = rdd.map(func)
print(rdd2.collect())

在这里插入图片描述

flatMap

flatMapmap差不多就是在最后做了一个解除嵌套的功能

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(['中石科技 时间还复活甲 如今房价','慰问金 咖啡机 姐夫哥','格很高 客服管家二恶烷 可归结为'])rdd2 = rdd.flatMap(lambda x:x.split(' '))print(rdd2.collect())

在这里插入图片描述
map的结果
在这里插入图片描述

reduceByKey

reduceByKey对数据进行分组可以两两计算

from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('男', 11), ('男', 22), ('女', 21), ('男', 31), ('女', 99)])
# 把男女进行分组value值进行计算
rdd2 = rdd.reduceByKey(lambda a, b:a+b)print(rdd2.collect()) # [('女', 120), ('男', 64)]

reduce

与reduce的区别就是没有进行分组

take

取出前几个数据

...
rdd = sc.parallelize([1,2,3,4,5]).take(3)  # [1,2,3]

count

计算rdd中的数据个数

filter

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)rdd=sc.parallelize([1,2,3,4,5])rdd2=rdd.filter(lambda a:a%2==0) 
print(rdd2.collect()) # [2,4]

distinct

进行数据去重

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)add= sc.parallelize([1,2,3,4,5,6,73,3,2,4,56,3,5])add2=add.distinct()
print(add2.collect()) # [56, 1, 73, 2, 3, 4, 5, 6]

sortBy排序

from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python3.11.4/python.exe'conf = SparkConf().setMaster('local[*]').setAppName('test_spark')
sc = SparkContext(conf=conf)add = sc.textFile('D:/wordText.txt')word_rdd = add.flatMap(lambda x: x.split(' '))
word_with_rdd = word_rdd.map(lambda word: (word, 1))
result_rdd =word_with_rdd.reduceByKey(lambda a,b:a+b)
result_num=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1) # 1.根据什么排序,2.True 升序 False降序 3.分布式分区
print(result_num.collect())

collect

将rdd内容变成list,从而就可以打印出来

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/90780.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SQLyog中导入CSV文件入库到MySQL中

1.在数据库中新建一个表,设置列名(与待导入文件一致),字段可以多出几个都可以 2.右键表名,导入- - >导入使用本地加载的CSV数据 选择使用加载本地CVS数据 3.指定好转义字符,将终止设置为,号(英文状态下…

SciencePub学术| 智能计量类重点SCIE征稿中

SciencePub学术 刊源推荐: 智能计量类重点SCIE征稿中!信息如下,录满为止: 一、期刊概况: 智能计量类重点SCIE 【期刊简介】IF:2.0-2.5,JCR3区,中科院4区; 【版面类型】正刊&#…

多维时序 | MATLAB实现ZOA-CNN-BiGRU-Attention多变量时间序列预测

多维时序 | MATLAB实现ZOA-CNN-BiGRU-Attention多变量时间序列预测 目录 多维时序 | MATLAB实现ZOA-CNN-BiGRU-Attention多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.Matlab基于ZOA-CNN-BiGRU-Attention斑马优化卷积双向门控循环单元网络…

【CSS】禁用元素鼠标事件(例如实现元素禁用效果)

文章目录 基本用法 基本用法 pointer-events 属性指定在什么情况下 (如果有) 某个特定的图形元素可以成为鼠标事件。实际运用中可以通过对auto 和none动态控制,来动态实现元素的禁用效果。 属性描述auto与pointer-events属性未指定时的表现效果相同,对…

vue3 setup+Taro3 调用原生小程序自定义年月日时分多列选择器,NutUI改造

vue3 setupTaro3 调用原生小程序自定义年月日时分多列选择器&#xff0c;NutUI改造 NutUI 有日期时间选择器&#xff0c;但是滑动效果太差&#xff0c;卡顿明显。换成 原生小程序 很顺畅 上代码&#xff1a; <template><view><pickermode"multiSelector&…

lodash常用方法笔记

_.fromPairs(pairs) 与_.toPairs正好相反&#xff1b;这个方法返回一个由键值对pairs构成的对象。 _.fromPairs([[fred, 30], [barney, 40]]); // > { fred: 30, barney: 40 }Object.fromEntries()有同样的功能&#xff0c;只是在高版本浏览器才支持&#xff1a; _toPai…

zabbix监控tomcat

一、zabbix监控Tomcat1.1 zbx-agent配置1.1.1 关闭防火墙&#xff0c;将安装 Tomcat 所需软件包传到/opt目录下1.1.2 安装JDK1.1.3 设置JDK环境变量1.1.4 安装启动Tomcat1.1.5 配置 JMX 1.2 zbx-server配置1.2.1 安装zabbix&#xff08;省略&#xff0c;可看上一篇博客&#xf…

汇编知识点之80x86指令系统

指令系统主要考虑以下几个方面&#xff1a; ①对PSW影响  影响/不影响/不定义 ②B/W  字节还是字操作 ③寻址方式 ④功能 ⑤格式 一、数据传送指令 1.通用数据传送指令 (1) MOV DST,SRC    &#xff1c;–>  (DST)<–(SRC) 注&#xff1a;1.二者不能同时为段…

ardupilot参数的mavlink实现

专业名词释义&#xff0c;参数缩写 gimbal 云台&#xff0c;万向接头 failsafe 故障保护 Collective&#xff1a; 总距 Swashplate &#xff1a; 倾斜盘 SW&#xff1a; Swashplate 倾斜盘 RSC&#xff1a; Rotor Speed Control RC&#xff1a; Radio Channel 无线通道 DDFP&am…

创建一个 React+Typescript 项目

接下来 我们来一起探索一下用TypeScript 来编写react 这也是一个非常好的趋势&#xff0c;目前也非常多人使用 那么 我们就先从创建项目开始 首先 我们先找一个 或者 之前创建一个目录 用来放我们的项目 然后 在这个目录下直接输入 例如 这里 我想创建一个叫 tsReApp 的项目…

docker部署springboot

基础知识 什么是docker 官网&#xff1a; Docker Docs: How to build, share, and run applications | Docker Documentation Docker 是一个基于go语言开发的开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到…

指针---进阶篇(二)

指针---进阶篇&#xff08;二&#xff09; 前言一、函数指针1.抛砖引玉2.如何判断函数指针&#xff1f;&#xff08;方法总结&#xff09; 二、函数指针数组1.什么是函数指针数组&#xff1f;2.讲解函数指针数组3.模拟计算器&#xff1a;讲解函数指针数组 三、指向函数指针数组…

组态王-实现语音播报告警点位信息及语音通知-语音播报器|声光报警器|工业报警方案|语音报警器|工业报警器|语音播报模块

需求简介 本文将介绍组态王如何对产生的告警实现声光语音播报&#xff0c;根据不同的告警点位&#xff0c;朗读具体的告警内容。 本文使用大连英仕博科技有限公司生产的博灵语音通知终端A4与北京亚控开发的组态王进行联动。 本文章分2部分讲解 第一部分为demo调用演示第二部…

调试 SELinux

semanage port -a -t http_port_t -p tcp 82 题目&#xff1a; 非标准端口 82 上运行的 WEB 服务器在提供内容时遇到问题。根据需要调试并解决问题&#xff0c; 并使其满足以下条件&#xff1a; 系统上的 web 服务器能够提供/var/www/html 中所有现在有的 html 文件&#xff…

无涯教程-Perl - getpwnam函数

描述 此函数基于EXPR指定的用户名,从/etc/passwd文件提取的列表context中返回字段列表。通常这样使用- ($name,$passwd,$uid,$gid,$quota,$comment,$gcos,$dir,$shell) getpwnam($user); 在标量context中,返回数字用户ID。如果尝试访问整个/etc/passwd文件,则应使用getpwent…

Python 3 使用Hadoop 3之MapReduce总结

MapReduce 运行原理 MapReduce简介 MapReduce是一种分布式计算模型&#xff0c;由Google提出&#xff0c;主要用于搜索领域&#xff0c;解决海量数据的计算问题。 MapReduce分成两个部分&#xff1a;Map&#xff08;映射&#xff09;和Reduce&#xff08;归纳&#xff09;。…

idea常见错误大全之:解决全局搜索失效+搜索条件失效(条件为空)+F8失灵

问题一&#xff1a;全局搜索快捷键ctrlshiftf 突然失灵了&#xff0c;键盘敲烂了 都没反应&#xff0c;这是为什么呢&#xff1f; 肯定不是idea本身的原因&#xff0c;那么就是其它外在因素影响到了idea的快捷键&#xff0c;那么其它的快捷键为什么没失效呢&#xff0c;原因只有…

IDEA简单拷贝一份新项目记录

IDEA简单拷贝项目记录 拷贝后改项目名&#xff0c;然后iml 配置文件改项目名&#xff0c;然后 .idea 中的compiler.xml 里面的name标签改项目名。 就可以了

ubuntu中安装python

最简单方便的是 apt 使用第三方的 ppa 源&#xff0c;然后直接 apt 安装 python3.9 安装 software-properties-common 获取add-apt-repository命令&#xff1a;apt install -y software-properties-common添加第三方的 ppa 源&#xff1a;add-apt-repository ppa:deadsnakes/p…

el-table分页后序号连续的两种方法

实现效果&#xff1a; 第一页排序到10&#xff0c;第二页的排序应从11开始 实现方法一&#xff1a; 在el-table的序号列中使用template定义 <el-table><el-table-columnmin-width"10%"label"序号"><template slot-scope"scope"…