pyspark入门基础详细讲解

1.前言介绍

学习目标:了解什么是Speak、PySpark,了解为什么学习PySpark,了解课程是如何和大数据开发方向进行衔接

使用pyspark库所写出来的代码,既可以在电脑上简单运行,进行数据分析处理,又可以把代码无缝迁移到成百上千的服务器集群上去做分布式计算。

为什么要学习pyspark呢?

总结

2.基础准备

学习目标:掌握pyspark库的安装,掌握pyspark执行环境入口对象的构建,理解pyspark的编程模型。

建议使用国内代理镜像网站下载更快。

 简化代码,本质上是同一个意思,链式结构,链式调用化简程序 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊

代码展示:
"""
演示获取pyspark的执行环境入库对象:SparkContext
并通过SparkContext对象获取当前PySpark的版本
"""# 导包
from pyspark import SparkConf,SparkContext
# 创建SparkConf类对象  setMaster是描写运行模式   setAppName是设置当前Spark任务的名字
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 同一个意思,链式结构,链式调用化简程序
# 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

spark需要启动时间,所以代码的运行一小会,3.5.3就是当前spark的运行版本

这个sc非常非常重要哦,后续给大家讲解。

通过sc拿到数据输入,数据处理计算是通过RDD类对象的一系列成员方法来对数据进行计算,然后把结果对外进行输出

我们只需要记住后期写spark代码的三大步,把数据加载进来,对数据进行计算,把结果输出去

总结

3.数据输入

学习目标:理解RDD对象,掌握PySpark数据输入的2种方法。

RDD就和列表等数据容器差不多

python数据容器转RDD对象

parallelize成员方法把数据容器存入RDD对象

如果要查看RDD里面有什么内容,需要用collect()方法

字符串会把每一个字符都拆出来,存入RDD对象,字典仅有key被存入RDD对象

读取文件转RDD对象

总结

4.数据计算

map方法

学习目标:掌握RDD的map方法

map会把传入的每一个参数都返回一个值

你会发现报错了,报错的原因是spark没有找到python解释器

给他指定一条路径,这样就没有问题了。如果指定路径之后还是没有解决的,可能是因为pycharm版本太新,降低版本就行了,建议是pycharm3.10

对于简单函数我们可以使用lambda匿名函数。

结果是一样的

链式调用

总结

flatMap方法

学习目标:掌握RDD的flatMap方法对数据进行计算。

通过map,可以看到尽管我们把数据分成一个一个的,但是还是存在嵌套,依旧被嵌套在list当中

当我们使用了flatMap方法后,发现解除了嵌套

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)# 准备一个RDD
rdd = sc.parallelize(["ikun 22","ikun 3","ik unh hhhh"])
# 需求:将RDD数据里面的一个个单词提取出来
rdd2 = rdd.flatMap(lambda x: x.split(" ")) # 使用空格进行切分
print(rdd2.collect())

使用flatMap可以解除内部嵌套,语法与map一样

总结:

reduceByKey方法

学习目标:掌握RDD的reduceByKey方法    

 

二元元组指的是元组里面存储的只有两个元素

KV型的RDD一般是两个元素,把第一个元素当成key,第二个当成value,自动按照key分组,然后根据你传入的逻辑计算value

(v,v)->(v)  意思是传入两个相同类型的参数,返回一个返回值,类型和传入要求一致

自动分组并且组内求和

总结

可以完成按key进行分组,并且组内进行逻辑计算

练习案例1

学习目标:完成使用PySpark进行单词计数的案例

数据文件

取出所有的单词,flatMap是把单词一个一个取出来,map是把单词一行一行取出来,一行是一个列表。

把单词转换成二元元组

完整代码

"""
完成练习案例:单词计数统计
"""
from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.打印输出结果
print(result_rdd.collect())

filter方法

学习目标:掌握RDD的filter方法

True被保留,False被丢弃

总结

distinct方法

学习目标:掌握RDD的distinct方法

不需要传入参数,功能简单就是去重操作

总结

sortBy方法

学习目标:掌握RDD的sortBy方法进行内容的排序

接收函数传入参数并且有一个返回值

目前我们没有解除到分布式,就先写上numPartitions=1

之前写过一个读取文件,统计单词的个数,现在让我们对他进行排序

可以自己控制升序或者降序,True升序,False降序

from pyspark import SparkConf,SparkContext
import os
# 1.构建执行环境入口对象
os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2.读取数据文件
rdd = sc.textFile("D:/word.txt")
# 3.取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4.将所有单词都转换成二元元组,单词为key,value设置为1
# (hello,1) (spark,1) (itheima,1) (itcast,1)
word_with_one_rdd = word_rdd.map(lambda word: (word,1))
# 5.分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
# 6.对结果进行排序
final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print(final_rdd.collect())

总结:

练习案例2

学习目标:完成练习案例2的开发

完整代码:

"""
完成练习案例:json商品统计
"""
# 1.各个城市销售额排名,从小到大
# 2.全部城市,有哪些商品类别在售卖
# 3.北京市有哪些商品类别在售卖
from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 需求1:城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/2222.txt")
# 1.2 取出一个个json字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))# 1.3 将一个个json字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'],int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a,b:a+b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1)
print("需求1的结果:",result1_rdd.collect())
# 需求2:全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
# 2.2 对全部商品类别进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:",category_rdd.collect())
# 需求3:北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别
# 3.3 进行商品类别去重
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:",result3_rdd.collect())

5.数据输出

输出为python对象

学习目标:掌握将RDD的结果输出为python对象的各类方法

collect算子

reduce算子

reduce和reducebykey的区别是reducebykey是获取key然后组内计算,reduce是单纯的直接计算

take算子

就是取前N个元素

count算子

总结

from pyspark import SparkConf,SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] ='D:/python/venv/Scripts/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 准备RDD
rdd = sc.parallelize([1,2,3,4,5])
# collect算子,输出RDD为list对象
rdd_list:list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两融合
num = rdd.reduce(lambda a,b: a+b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")
sc.stop()

输出到文件中

学习目标:掌握将RDD的内容输出到文件中,了解如何更改RDD的分区数为1

报错了,原因是配置的问题,接下来我们给他配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/469004.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

权限管理练习2

1.在/home中创建一个名为 file1.txt 的文件,并设置权限为:所有者和组成员可以读写,但其他人只能读。 所有者和组成员可以读写 u rw- g rw- o r-- 2.在 /home 目录下创建一个名为 shared 的子目录,使得所有用户都可以进入&#…

面试经典 150 题:121,125

121. 买卖股票的最佳时机 【参考代码】 动态规划解决 class Solution { public:int maxProfit(vector<int>& prices) {int size prices.size();int min_price 99999, max_profit 0;for(int i0; i<size; i){if(prices[i] < min_price){min_price prices[i…

数据集划分

1、 sklearn玩具数据集介绍 数据量小&#xff0c;数据在sklearn库的本地&#xff0c;只要安装了sklearn&#xff0c;不用上网就可以获取 2 sklearn现实世界数据集介绍 数据量大&#xff0c;数据只能通过网络获取&#xff08;科学上网&#xff09; 3 sklearn加载玩具数据集 示…

图形几何之美系列:仿射变换矩阵之先转后偏

“在几何计算、图形渲染、动画、游戏开发等领域&#xff0c;常需要进行元素的平移、旋转、缩放等操作&#xff0c;一种广泛应用且简便的方法是使用仿射变换进行处理。相关的概念还有欧拉角、四元数等&#xff0c;四元数在图形学中主要用于解决旋转问题&#xff0c;特别是在三维…

刷题强训(day05) -- 游游的you、腐烂的苹果、孩子们的游戏(圆圈中最后剩下的数)

目录 1、游游的you 1.1 题目 1.2 思路 1.3 代码实现 2、腐烂的苹果 2.1 题目 2.2 思路 2.3 代码实现 3、孩子们的游戏(圆圈中最后剩下的数) 3.1 题目 3.2 思路 3.3 代码实现 3.3.1 环形链表 ​编辑3.3.2 动态规划 ​编辑 1、游游的you 1.1 题目 1.2 思路 根据题…

PyQt5超详细教程终篇

PyQt5超详细教程 前言 接&#xff1a; [【Python篇】PyQt5 超详细教程——由入门到精通&#xff08;序篇&#xff09;](【Python篇】PyQt5 超详细教程——由入门到精通&#xff08;序篇&#xff09;-CSDN博客) 建议把代码复制到pycahrm等IDE上面看实际效果&#xff0c;方便理…

并查集算法实现

模板 模板分为三大部分 初始化查询i的祖先合并i j(使他们祖先成为一个人) // 1 初始化 void init(int n) {for (int i 1; i < n; i)fa[i] i;//将该数的父节点定义为该数 }// 2 查询i的祖先 int find(int i) {if (i fa[i])return i;else{![查](../pic/并查集.png)fa[i]…

(实战)WebApi第13讲:怎么把不同表里的东西,包括同一个表里面不同的列设置成不同的实体,所有的给整合到一起?【前端+后端】、前端中点击标签后在界面中显示

一、实现全局跨域&#xff1a;新建一个Controller&#xff0c;其它的controller都继承它 1、新建BaseController 2、在后端配置&#xff0c;此处省略【详情见第12讲四、3、】 3、其它的控制器继承BaseController&#xff0c;这个时候就能够完成全局的跨域 【向后台传cookie和…

【计算机基础——数据结构——红黑树】

1. 红黑树&#xff08;RBTree&#xff09; 为什么HashMap不直接使用AVL树&#xff0c;而是选择了红黑树呢&#xff1f; 由于AVL树必须保证左右子树平衡&#xff0c;Max(最大树高-最小树高) < 1&#xff0c;所以在插入的时候很容易出现不平衡的情况&#xff0c;一旦这样&…

【MatLab手记】 --从0到了解超超超详过程!!!

文章目录 MatLab笔记一、命令行窗口二、变量命名规则三、数据类型1. 数字2. 字符与字符串3. 矩阵3.1 矩阵创建3.2 矩阵的修改和删除3.3 矩阵的拼接与重构重排3.4 矩阵的运算方法3.5 矩阵的下标 4. 元胞数组&#xff08;类似数据容器&#xff09;5. 结构体 四、逻辑与流程控制五…

Qt_day5_常用类

常用类 目录 1. QString 字符串类&#xff08;掌握&#xff09; 2. 容器类&#xff08;掌握&#xff09; 2.1 顺序容器QList 2.2 关联容器QMap 3. 几种Qt数据类型&#xff08;熟悉&#xff09; 3.1 跨平台数据类型 3.2 QVariant 统一数据类型 3.3 QStringList 字符串列表 4. QD…

【THM】linux取证 DisGruntled

目录 0x00 房间介绍 0x01 连接并简单排查 0x02 让我们看看做没做坏事 0x03 炸弹已埋下。但何时何地&#xff1f; 0x04 收尾 0x05 结论 0x00 房间介绍 嘿&#xff0c;孩子&#xff01;太好了&#xff0c;你来了&#xff01; 不知道您是否看过这则新闻&#xff0c;我…

MFC中Excel的导入以及使用步骤

参考地址 在需要对EXCEL表进行操作的类中添加以下头文件&#xff1a;若出现大量错误将其放入stdafx.h中 #include "resource.h" // 主符号 #include "CWorkbook.h" //单个工作簿 #include "CRange.h" //区域类&#xff0c;对Excel大…

智能化温室大棚控制系统设计(论文+源码)

1 系统的功能及方案设计 本次智能化温室大棚控制系统的设计其系统整体结构如图2.1所示&#xff0c;整个系统在器件上包括了主控制器STC89C52&#xff0c;温湿度传感器DHT11&#xff0c;LCD1602液晶&#xff0c;继电器&#xff0c;CO2传感器&#xff0c;光敏电阻&#xff0c;按…

一篇文章教会你使用Linux的‘sed‘基础命令

Linux sed 命令详解 Linux sed 命令详解1、基本语法2、常用命令2.1 替换2.2 删除行2.3 查找并打印行2.4 插入与追加2.5 多命令组合 3、高级用法3.1 替换并保存结果到新文件3.2 在范围内替换3.3 正则表达式匹配 4、小结 Linux sed 命令详解 sed 是 Linux 系统中非常强大的流编辑…

集群化消息服务解决方案

目录 集群化消息服务解决方案项目概述架构图使用说明服务端通过API接口推送消息给客户端调用方式 请求参数返回参数 客户端推送消息连接websocket或发送消息 接收消息项目地址作者信息 集群化消息服务解决方案 项目概述 集群化消息服务解决方案是一种用于处理大量消息的高可用…

elementUI 点击弹出时间 date-picker

elementUI的日期组件&#xff0c;有完整的UI样式及弹窗&#xff0c;但是我的页面不要它的UI样式&#xff0c;点击的时候却要弹出类似的日期选择器&#xff0c;那怎么办呢&#xff1f; 以下是elementUI自带的UI风格&#xff0c;一定要一个输入框来触发。 这是我的项目中要用到的…

【go从零单排】go中的三种数据类型array、slices、maps

Don’t worry , just coding! 内耗与overthinking只会削弱你的精力&#xff0c;虚度你的光阴&#xff0c;每天迈出一小步&#xff0c;回头时发现已经走了很远。 array数组 package mainimport "fmt"func main() {var a [5]int //var关键字定义数组&#xff0c;[5]表…

科技改变阅读习惯:最新研究揭示电子阅读器的普及趋势

据QYResearch调研团队最新报告“全球电子阅读器市场报告2023-2029”显示&#xff0c;预计2029年全球电子阅读器市场规模将达到6.9亿美元&#xff0c;未来几年年复合增长率CAGR为0.4%。 如上图表/数据&#xff0c;摘自QYResearch最新报告“全球电子阅读器市场研究报告2023-2029.…

解决 VSCode 中 C/C++ 编码乱码问题的两种方法

解决 VSCode 中 C/C 编码乱码问题的两种方法 在中国地区&#xff0c;Windows 系统中的 cmd 和 PowerShell 默认编码是 GBK&#xff0c;但 VSCode 默认使用 UTF-8 编码。这种编码不一致会导致在 VSCode 终端中运行 C/C 程序时出现乱码。以下介绍两种方法来解决这一问题。 方法…