1.前言介绍
学习目标:了解什么是Speak、PySpark,了解为什么学习PySpark,了解课程是如何和大数据开发方向进行衔接
使用pyspark库所写出来的代码,既可以在电脑上简单运行,进行数据分析处理,又可以把代码无缝迁移到成百上千的服务器集群上去做分布式计算。
为什么要学习pyspark呢?
总结
2.基础准备
学习目标:掌握pyspark库的安装,掌握pyspark执行环境入口对象的构建,理解pyspark的编程模型。
建议使用国内代理镜像网站下载更快。
简化代码,本质上是同一个意思,链式结构,链式调用化简程序 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊
代码展示: """ 演示获取pyspark的执行环境入库对象:SparkContext 并通过SparkContext对象获取当前PySpark的版本 """# 导包 from pyspark import SparkConf,SparkContext # 创建SparkConf类对象 setMaster是描写运行模式 setAppName是设置当前Spark任务的名字 conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 同一个意思,链式结构,链式调用化简程序 # 基本原则,就是我不管调用什么方法,我的返回值都是同一个对象啊 # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 打印PySpark的运行版本 print(sc.version) # 停止SparkContext对象的运行(停止PySpark程序) sc.stop()
spark需要启动时间,所以代码的运行一小会,3.5.3就是当前spark的运行版本
这个sc非常非常重要哦,后续给大家讲解。
通过sc拿到数据输入,数据处理计算是通过RDD类对象的一系列成员方法来对数据进行计算,然后把结果对外进行输出
我们只需要记住后期写spark代码的三大步,把数据加载进来,对数据进行计算,把结果输出去
总结
3.数据输入
学习目标:理解RDD对象,掌握PySpark数据输入的2种方法。
RDD就和列表等数据容器差不多
parallelize成员方法把数据容器存入RDD对象
如果要查看RDD里面有什么内容,需要用collect()方法
字符串会把每一个字符都拆出来,存入RDD对象,字典仅有key被存入RDD对象
总结
4.数据计算
map方法
学习目标:掌握RDD的map方法
map会把传入的每一个参数都返回一个值
你会发现报错了,报错的原因是spark没有找到python解释器
给他指定一条路径,这样就没有问题了。
对于简单函数我们可以使用lambda匿名函数。
结果是一样的
链式调用
总结
flstMap方法
学习目标:掌握RDD的flatMap方法对数据进行计算。
提供map,可以看到尽管我们把数据分成一个一个的,但是还是存在嵌套,依旧被嵌套在list当中
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf)# 准备一个RDD rdd = sc.parallelize(["ikun 22","ikun 3","ik unh hhhh"]) # 需求:将RDD数据里面的一个个单词提取出来 rdd2 = rdd.flatMap(lambda x: x.split(" ")) # 使用空格进行切分 print(rdd2.collect())
使用flatMap可以解除内部嵌套,语法与map一样
总结:
reduceByKey方法
学习目标:掌握RDD的reduceByKey方法
二元元组指的是元组里面存储的只有两个元素
KV型的RDD一般是两个元素,把第一个元素当成key,第二个当成value,自动按照key分组,然后根据你传入的逻辑计算value
(v,v)->(v) 意思是传入两个相同类型的参数,返回一个返回值,类型和传入要求一致
自动分组并且组内求和
总结
可以完成按key进行分组,并且组内进行逻辑计算
练习案例1
学习目标:完成使用PySpark进行单词计数的案例
数据文件
取出所有的单词,flatMap是把单词一个一个取出来,map是把单词一行一行取出来,一行是一个列表。
把单词转换成二元元组
完整代码
""" 完成练习案例:单词计数统计 """ from pyspark import SparkConf,SparkContext import os # 1.构建执行环境入口对象 os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 2.读取数据文件 rdd = sc.textFile("D:/word.txt") # 3.取出全部单词 word_rdd = rdd.flatMap(lambda x: x.split(" ")) # 4.将所有单词都转换成二元元组,单词为key,value设置为1 # (hello,1) (spark,1) (itheima,1) (itcast,1) word_with_one_rdd = word_rdd.map(lambda word: (word,1)) # 5.分组并求和 result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b) # 6.打印输出结果 print(result_rdd.collect())
filter方法
学习目标:掌握RDD的filter方法
True被保留,False被丢弃
总结
distinct方法
学习目标:掌握RDD的distinct方法
不需要传入参数,功能简单就是去重操作
总结
sortBy方法
学习目标:掌握RDD的sortBy方法进行内容的排序
接收函数传入参数并且有一个返回值
目前我们没有解除到分布式,就先写上numPartitions=1
之前写过一个读取文件,统计单词的个数,现在让我们对他进行排序
可以自己控制升序或者降序
from pyspark import SparkConf,SparkContext import os # 1.构建执行环境入口对象 os.environ['PYSPARK_PYTHON'] ="D:/python/venv/Scripts/python.exe" conf = SparkConf().setMaster("local[*]").setAppName("test_spark") sc = SparkContext(conf=conf) # 2.读取数据文件 rdd = sc.textFile("D:/word.txt") # 3.取出全部单词 word_rdd = rdd.flatMap(lambda x: x.split(" ")) # 4.将所有单词都转换成二元元组,单词为key,value设置为1 # (hello,1) (spark,1) (itheima,1) (itcast,1) word_with_one_rdd = word_rdd.map(lambda word: (word,1)) # 5.分组并求和 result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b) # 6.对结果进行排序 final_rdd = result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1) print(final_rdd.collect())