文章目录
- day06_Spark SQL课程笔记
- 一、今日课程内容
- 二、DataFrame详解(掌握)
- 5.清洗相关的API
- 6.Spark SQL的Shuffle分区设置
- 7.数据写出操作
- 写出到文件
- 写出到数据库
- 三、Spark SQL的综合案例(掌握)
- 1、常见DSL代码整理
- 2、电影分析案例
- 需求说明:
- 需求分析:
- 四、Spark SQL函数定义
- 1、窗口函数
- 2、SQL函数分类
- 3、Spark原生自定义UDF函数
- 01_数据清洗的dropDuplicates函数.py
- dropDuplicates ==行来看,保留第一个重复值,有subset也是==
- 结果
- 02_数据清洗的dropna函数.py
- dropna ==行来看,how默认=any行中有空就删行,how='all'行中全部都是空才删行,subset=['name']列中有空删行,thresh=3最少有三个 正常值(非空值)==
- 结果
- 03_数据清洗的fillna函数.py
- fillna ==列来看,value用指定值填充空值==
- 结果
- 04_清洗后数据存储到文件.py
- 结果
- 05_清洗后数据存储到mysql数据库.py
- 结果
- 07_清洗后数据存储到文件_优化分区数.py
- 结果
- 08_电影数据分析案例.py
- 09_sparkSQL中应用开窗函数.py
day06_Spark SQL课程笔记
一、今日课程内容
- 1- DataFrame详解(掌握)
- 2- Spark SQL的综合案例(掌握)
- 3- Spark SQL函数定义(掌握)
今日目的:掌握DataFrame详解
二、DataFrame详解(掌握)
5.清洗相关的API
API是什么
简单来说:API(应用程序编程接口)就像是“软件之间的翻译官”,定义了不同系统或组件之间如何交互和通信,让开发者能够更方便地使用外部功能或服务。
具体而言:
- 定义:API是一组预定义的函数、协议和工具,用于构建软件应用程序。它规定了如何请求服务、传递数据以及接收结果。
- 类型:
- Web API:基于HTTP协议,用于Web服务之间的通信,如RESTful API、GraphQL。
- 库API:编程语言提供的函数库,如Python的NumPy、Pandas。
- 操作系统API:操作系统提供的接口,如Windows API、Linux系统调用。
- 工作原理:
- 请求:客户端发送请求(如HTTP请求)到API。
- 处理:服务器接收请求并处理。
- 响应:服务器返回结果(如JSON数据)给客户端。
实际生产场景:
- 在移动应用中,使用Web API获取天气数据或支付服务。
- 在数据分析中,使用Python库API(如Pandas)处理数据。
- 在系统开发中,使用操作系统API管理文件或进程。
总之:API是软件开发的基石,通过标准化接口简化了系统间的交互,提高了开发效率和代码复用性。
总结:
1- dropDuplicates(subset):用来删除重复数据。1.1- 如果没有指定参数subset,那么要比对行中的所有字段内容,如果全部相同,就认为是重复数据,会被删除;1.2- 如果有指定参数subset,那么只比对subset中指定的字段范围内删除重复数据2- dropna(thresh,subset):删除缺失值数据.2.1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据2.2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内2.3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh个字段的值不为空,才不会被删除3- fillna(value,subset): 替换缺失值数据3.1- value: 必须要传递参数.是用来填充缺失值的,默认填充所有的缺失值3.1- subset: 如果有指定参数subset,那么只比对subset中指定的字段范围内替换注意:value最常用的是传递字典的形式
代码演示:
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def demo1_dropDuplicates():# dropDuplicates: 如果有重复就会去重,保留第一个,当然也可以指定参考的列df.dropDuplicates().show()df.dropDuplicates(['name']).show()def demo2_dropna():# dropna: 默认去除带null每行数据,当然也可以指定参考的列df.dropna().show()df.dropna(thresh=2).show()df.dropna(thresh=1, subset=['id', 'name']).show()def demo3_fillna():# fillna: 默认只要带null就补充指定内容,当然也可以指定参考列df.fillna('空').show()df.fillna('空', subset=['address']).show()df.fillna({'name': '未知', 'address': '广州'}).show()# 创建main函数
if __name__ == '__main__':# 1.创建spark对象# appName:应用程序名称 master:提交模式# getOrCreate:在builder构建器中获取一个存在的SparkSession,如果不存在,则创建一个新的spark = SparkSession.builder.appName('sparksql_demo').master('local[*]').getOrCreate()# 2.通过read读取外部文件方式创建DF对象df = spark.read \.format('csv') \.option('header', True) \.load('file:///export/data/spark_project/spark_sql/data/clear_data.csv')# 3.show直接展示df.show()# 4.清洗数据# dropDuplicates: 如果有重复就会去重,保留第一个,当然也可以指定参考的列# demo1_dropDuplicates()print('------------------------------------------------------')# dropna: 默认去除带null每行数据,当然也可以指定参考的列# demo2_dropna()print('------------------------------------------------------')# fillna: 默认只要带null就补充指定内容,当然也可以指定参考列demo3_fillna()# 5.释放资源spark.stop()
6.Spark SQL的Shuffle分区设置
补充:
如果运行sparksql,发现Shuffle分区每次都是1,或者后续count_distinct找不到,那么是因为pyspark版本原因导致。解决办法如下:
1- 检查自己3台机器的pyspark版本是否是3.1.2版本
pip list | grep pyspark
2-如果不是3.1.2版本,那么先卸载pyspark
命令: pip uninstall pyspark
3- 再按照【Spark课程阶段_部署文档.doc】中重新安装3.1.2版本pyspark
命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==3.1.2
Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行
Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小
如何调整shuffle分区数量呢? spark.sql.shuffle.partitions
方案一(不推荐): 直接修改spark的配置文件spark-defaults.conf。全局设置,默认值为200。设置为: spark.sql.shuffle.partitions 20方案二(常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候./spark-submit --conf "spark.sql.shuffle.partitions=20"方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除。优先级也是最高的。一般的使用场景是,当你的数据量未来不会发生太大的波动。
设置shuffle分区的数量方式1:SparkSession.builder.config('spark.sql.shuffle.partitions', 1)
设置shuffle分区的数量方式2:spark.conf.set("spark.sql.shuffle.partitions",'1')获取shuffle分区的数量:spark.conf.get("spark.sql.shuffle.partitions")
# 导包
import os
import time
from pyspark.sql import SparkSession, functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 7.1 TODO: 记录程序开始时间start = time.time()# 1.创建spark对象# appName:应用程序名称 master:提交模式# getOrCreate:在builder构建器中获取一个存在的SparkSession,如果不存在,则创建一个新的spark = SparkSession.builder \.config('spark.sql.shuffle.partitions', 1)\.appName('sparksql_demo')\.master('local[*]')\.getOrCreate()# 获取shuffle分区的数量shuffle_partitions = spark.conf.get("spark.sql.shuffle.partitions")print("Shuffle partitions:", shuffle_partitions)# 2.通过read读取外部文件方式创建DF对象df = spark.read \.format('text') \.schema('words string') \.load('file:///export/data/spark_project/spark_sql/data/data3.txt')print(type(df))# 需求: 从data3.txt读取所有单词,然后统计每个单词出现的次数# 3.SQL风格# 方式1: 使用子查询方式# 先创建临时视图,然后通过sql语句查询展示df.createTempView('words_tb')qdf = spark.sql("select words,count(1) as cnt from (select explode(split(words,' ')) as words from words_tb) t group by words")print(type(qdf))qdf.show()# 4.DSL风格# 方式2: 分组后用agg函数df.select(F.explode(F.split('words', ' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt')).show()# 7.2 TODO:记录结束时间end = time.time()# 7.3 计算运行时间# 结论: 合理设置分区数,效率会提高!t = end - startprint(f"程序运行了{t}秒")# 6.TODO:为了方便查看web页面可以让程序多睡会儿# time.sleep(1000)# 5.释放资源spark.stop()
7.数据写出操作
写出到文件
统一的输出语法:
对应的简写API格式如下,以CSV为例:
init_df.write.csv(path='存储路径',mode='模式',header=True,sep='\001',encoding='UTF-8'
)
常用参数说明:1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统2- mode:当输出目录中文件已经存在的时候处理办法2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path file:xxx already exists.3- sep:字段间的分隔符4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True5- encoding:文件输出的编码方式
- 演示1: 输出到文件中 json csv orc text …
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()spark.conf.set("spark.sql.shuffle.partitions", '1')# 2.数据输入df = spark.read\.format('csv')\.option('header',True)\.load('file:///export/data/spark_project/spark_sql/data/clear_data.csv')# 3.数据处理(切分,转换,分组聚合)etldf = df.dropDuplicates().dropna()etldf.show()# 4.数据输出# 原始APIetldf.write\.format('csv')\.option('sep',',')\.option('header',True)\.mode('overwrite')\.save('file:///export/data/spark_project/spark_sql/data/output')# 简化APIetldf.write.csv(sep=',',header=True,mode='overwrite',path='file:///export/data/spark_project/spark_sql/data/output2')# 5.关闭资源spark.stop()
写出到数据库
- 将结果数据基于JDBC方案, 输出到关系型数据库, 例如说: MySql
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("数据输出到数据库")# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions","1")\.appName('sparksql_database')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',sep=' ',encoding='UTF-8',header="True",inferSchema=True)# 3- 数据处理result = init_df.where('age>=20')# 4- 数据输出result.show()result.printSchema()# 数据输出到数据"""创建数据库命令:create database day06 character set utf8;"""result.write.jdbc(url='jdbc:mysql://node1:3306/day06?useUnicode=true&characterEncoding=utf-8',table='student',mode='append',sql={ 'user' : 'root', 'password' : '123456' })# 5- 释放资源spark.stop()
运行结果截图:
可能出现的错误一:
etldf.write.jdbc(url='jdbc:mysql://node1:3306/库名',table='表名',mode='append',# 解决方法: 给密码123456加上引号sql={'user':'root','password':'123456'})
可能出现的错误 二:
原因: 缺少连接MySQL数据库的驱动
解决方法如下:
数据库的驱动包, 一般都是一些Jar包,放置【mysql-connector-java-5.1.41.jar】驱动包到以下位置:1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,目录位置: /export/server/spark/jars2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下hdfs的spark的jars目录下: hdfs://node1:8020/spark/jars请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案: spark-submit --jars ....
可能出现的错误三:
原因:将中文输出到了数据表中
解决办法:
1- 数据库连接要加上: ?useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码: character set utf8
# 最终连接数据代码如下:etldf.write.jdbc(url='jdbc:mysql://node1:3306/spark_db1?useUnicode=true&characterEncoding=utf-8',table='student',mode='append',sql={'user':'root','password':'123456'})
三、Spark SQL的综合案例(掌握)
1、常见DSL代码整理
分类 | 格式 | 含义 | 示例 |
---|---|---|---|
API/方法 | select | 查询字段 | select(‘id1’, ‘id2’) |
where | 对数据过滤 | where(‘avg_score>3’) | |
groupBy | 对数据分组 | groupBy(‘userid’) | |
orderBy | 对数据排序 | orderBy(‘cnt’, ascending=False) | |
limit | 取前几条数据 | orderBy(‘cnt’, ascending=False).limit(1) | |
agg | 聚合操作,里面可以写多个聚合表达式 | agg(F.round(F.avg(‘score’), 2).alias(‘avg_score’)) | |
show | 打印数据 | init_df.show() | |
printSchema | 打印数据的schema信息,也就是元数据信息 | init_df.printSchema() | |
alias | 对字段取别名 | F.count(‘movieid’).alias(‘cnt’) | |
join | 关联2个DataFrame | etl_df.join(avg_score_dsl_df, ‘movieid’) | |
withColumn | 基于目前的数据产生一个新列 | init_df.withColumn(‘word’,F.explode(F.split(‘value’, ’ '))) | |
dropDuplicates | 删除重复数据 | init_df.dropDuplicates(subset=[“id”,“name”]) | |
dropna | 删除缺失值 | init_df.dropna(thresh=2,subset=[“name”,“age”,“address”]) | |
fillna | 替换缺失值 | init_df.fillna(value={“id”:111,“name”:“未知姓名”,“age”:100,“address”:“北京”}) | |
first | 取DataFrame中的第一行数据 | ||
over | 创建一个窗口列 | ||
窗口 | partitionBy | 对数据分区 | |
orderBy | 对数据排序 | orderBy(F.desc(‘pv’)) | |
函数 | row_number | 行号。从1开始编号 | |
desc | 降序排序 | ||
avg | 计算均值 | ||
count | 计数 | ||
round | 保留小数位 | ||
col | 将字段包装成Column对象,一般用于对新列的包装 | ||
1- 什么使用使用select(),什么时候使用groupBy()+agg()/select()实现聚合?:如果不需要对数据分组,那么可以直接使用select()实现聚合;如果有分组操作,需要使用groupBy()+agg()/select(),推荐使用agg()2- first(): 如果某个DataFrame中只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较3- F.col(): 对于在计算过程中临时产生的字段,需要使用F.col()封装成Column对象,然后去使用
- API/方法:是由DataFrame来调用
- 函数:需要先通过import pyspark.sql.functions as F导入,使用F调用。Spark SQL内置提供的函数https://spark.apache.org/docs/3.1.2/api/sql/index.html
- 窗口:需要先通过from pyspark.sql import Window导入
2、电影分析案例
需求说明:
数据集的介绍:
数据说明 : userid,movieid,score,datestr字段的分隔符号为: \t
需求分析:
-
需求一: 查询用户平均分
需求分析:
维度:用户
指标:平均分 -
需求二: 查询每部电影的平均分(课后作业,将自己对需求的分析步骤以文字的形式放在代码中)
-
需求三: 查询大于平均分的电影的数量
需求分析:
1- 统计所有打分的平均分,这个结果就是一个数字
2- 统计每部电影各自的平均分
3- 查询大于平均分的电影的数量
-
需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少
需求分析:
1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息
2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户
3- 统计该用户所有打分记录的平均分
-
需求五: 查询每个用户的平均打分, 最低打分, 最高打分(课后作业)
-
需求六: 查询被评分超过100次的电影的平均分 排名 TOP10(课后作业)
一三四需求实现代码:
# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def demo1_get_user_avg_score():# 方式1: SQL方式spark.sql('select userid,round(avg(score),3) as user_avg_score from movie group by userid').show()# 方式2: DSL方式etldf.groupBy('userid').agg(F.round(F.avg('score'), 3).alias('user_avg_score')).show()def demo2_get_lag_avg_movie_cnt():# 方式1: SQLspark.sql("""select count(1) as cnt from (select movieid,avg(score) as movie_avg_score from movie group by movieidhaving movie_avg_score > (select avg(score) as all_avg_score from movie) ) t""").show()# 方式2: DSL# col():把临时结果作为新列使用 first():取第一个值etldf.groupBy('movieid').agg(F.avg('score').alias('movie_avg_score')).where(F.col('movie_avg_score') > etldf.select(F.avg('score').alias('all_avg_score')).first()['all_avg_score']).agg(F.count('movieid').alias('cnt')).show()def demo3_get_top1_user_avg_sql():# 方式1: SQL# ①先查询高分电影:spark.sql("select movieid,avg(score) as movie_avg_score from movie group by movieid having movie_avg_score > 3").createTempView('hight_score_tb')# ②再求打分次数最多的用户(先不考虑并列,只取最大1个)spark.sql("select userid,count(1) as cnt from hight_score_tb h join movie m on h.movieid = m.movieid group by userid order by cnt desc limit 1").createTempView('top1_user_tb')# ③最后求此人所有打分的平均分spark.sql("select avg(score) as top1_user_avg from movie where userid = (select userid from top1_user_tb)").show()def demo3_get_top1_user_avg_dsl():# ①先查询高分电影:hight_score_df = etldf.groupBy('movieid').agg(F.avg('score').alias('movie_avg_score')).where('movie_avg_score>3')# ②再求打分次数最多的用户(先不考虑并列,只取最大1个)top1_user_df = hight_score_df.join(etldf, on=hight_score_df['movieid'] == etldf['movieid']) \.groupBy('userid').agg(F.count('userid').alias('cnt')) \.orderBy('cnt', ascending=False).limit(1)# ③最后求此人所有打分的平均分etldf.where(etldf['userid'] == top1_user_df.first()['userid']).agg(F.avg('score').alias('top1_user_avg')).show()# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.read.csv(schema='userid string,movieid string,score int,datestr string',sep='\t',path='file:///export/data/spark_project/spark_sql/data/u.data')print(df.count())# 3.数据处理(切分,转换,分组聚合)etldf = df.dropDuplicates().dropna()print(etldf.count())# 4.数据分析# 方便后续所有SQL方式使用,提前创建临时视图作为表etldf.createTempView('movie')# 需求1: 查询用户的平均分# demo1_get_user_avg_score()# 需求3: 查询大于平均分的电影的数量# demo2_get_lag_avg_movie_cnt()# 需求4: 查询高分电影(平均分>3)中,打分次数最多的用户,并求出此人所有打分的平均分# 方式1: SQLdemo3_get_top1_user_avg_sql()# 方式2: DSLdemo3_get_top1_user_avg_dsl()# 5.数据输出# 6.关闭资源spark.stop()
附录: 问题
可能出现的错误一:
原因: 是使用withColumn产生新列,但是表达式中有聚合的操作。缺少groupBy调用
可能出现的错误二:
错误原因:DataFrame结果是单行的情况,列值获取错误
解决办法:
将df_total_avg_score['total_avg_score']改成df_total_avg_score.first()['total_avg_score']
可能遇到的错误三:
原因:对于在计算过程中临时产生的字段,需要使用F.col封装成Column对象
解决办法:F.col(‘avg_score’)
四、Spark SQL函数定义
1、窗口函数
回顾之前学习过的窗口函数:
分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])分析函数可以大致分成如下3类:
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: row_number() rank() dense_rank() ntile()
3- 第三类: first_value() last_value() lead() lag()
在Spark SQL中使用窗口函数案例:
需求是找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('sparksql_win_function')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/gz16_pyspark/02_spark_sql/data/cookie.txt',schema='cookie string,datestr string,pv int',sep=',',encoding='UTF-8')init_df.createTempView('win_data')init_df.show()init_df.printSchema()# 3- 数据处理# SQLspark.sql("""select cookie,datestr,pvfrom (selectcookie,datestr,pv,row_number() over (partition by cookie order by pv desc) as rnfrom win_data) tmp where rn<=3""").show()# DSL"""select:注意点,结果中需要看到哪几个字段,就要明确写出来"""init_df.select("cookie","datestr","pv",F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')).where('rn<=3').select("cookie","datestr","pv").show()# 4- 数据输出# 5- 释放资源spark.stop()
运行结果截图:
2、SQL函数分类
SQL函数,主要分为以下三大类:
- UDF函数:用户自定义函数
- 特点:一对一,输入一个得到一个
- 例如:split() substr()
- 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
- UDAF函数:用户自定义聚合函数
- 特点:多对一,输入多个得到一个
- 例如:sum() avg() count() min()
- UDTF函数:用户自定义表数据生成函数
- 特点:一对多,输入一个得到多个
- 例如:explode()
简单来说:SQL函数中的UDF、UDAF和UDTF就像是“数据处理的三剑客”,分别擅长“单点突破”、“团队协作”和“多点开花”,满足不同的数据处理需求。
具体而言:
- UDF(用户自定义函数):
- 比喻:像是“单兵作战”,一对一处理数据。
- 特点:输入一个值,输出一个值。
- 示例:
split()
(字符串分割)、substr()
(字符串截取)。- 实现:在Hive中继承
UDF
类,实现evaluate
方法。- UDAF(用户自定义聚合函数):
- 比喻:像是“团队协作”,多对一聚合数据。
- 特点:输入多行数据,输出一个聚合结果。
- 示例:
sum()
(求和)、avg()
(平均值)、count()
(计数)。- 实现:在Hive中继承
UDAF
类,实现init()
、iterate()
、merge()
等方法。- UDTF(用户自定义表生成函数):
- 比喻:像是“多点开花”,一对多展开数据。
- 特点:输入一行数据,输出多行数据。
- 示例:
explode()
(将数组或Map展开为多行)。- 实现:在Hive中继承
GenericUDTF
类,实现initialize()
、process()
等方法。实际生产场景:
- 在数据清洗中,使用UDF对字符串进行格式化处理。
- 在数据分析中,使用UDAF计算复杂的业务指标,如加权平均值。
- 在数据展开中,使用UDTF将嵌套的JSON数据拆分为多行,便于后续分析。
总之:UDF、UDAF和UDTF是SQL中强大的扩展工具,分别适用于“一对一”、“多对一”和“一对多”的数据处理场景,为业务需求提供了灵活的支持。
在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数
思考:有这么多的内置函数,为啥还需要自定义函数呢?
为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数
1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件,Python可以开发UDF、UDAF函数
在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。
在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。
Spark SQL原生存在的问题:大量的序列化和反序列
虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作
3、Spark原生自定义UDF函数
自定义函数流程:
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数注册到Spark SQL中注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用注意: 如果通过方式一来注册函数, 【可以用在SQL和DSL】注册方式二: udf对象 = F.udf(参数1,参数2)参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用注意: 如果通过方式二来注册函数,【仅能用在DSL中】注册方式三: 语法糖写法 @F.udf(returnType=返回值类型) 放置到对应Python的函数上面说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
自定义演示一:
请自定义一个函数,完成对数据统一添加一个后缀名的操作
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
from pyspark.sql.types import StringTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("请自定义一个函数,完成对数据统一添加一个后缀名的操作_itheima")# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_udf_basetype')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.createDataFrame(data=[(1,'张三','广州'),(2,'李四','深圳')],schema='id int,name string,address string')init_df.printSchema()init_df.show()init_df.createTempView('tmp')# 3- 数据处理# 3.1- 创建自定义的Python函数def add_suffix(address):return address + "_itheima"# 3.2- 将Python函数注册到Spark SQL# 注册方式一dsl_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())# 3.3- 在SQL/DSL中调用# SQLspark.sql("""selectid,name,address,sql_add_suffix(address) as new_addressfrom tmp""").show()# DSLinit_df.select("id","name","address",dsl_add_suffix("address").alias("new_address")).show()print("-"*30)# 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。# spark.sql("""# select# id,name,address,# dsl_add_suffix(address) as new_address# from tmp# """).show()# 注册方式二:UDF返回值类型传值方式一dsl2_add_suffix = F.udf(add_suffix,StringType())# DSLinit_df.select("id","name","address",dsl2_add_suffix("address").alias("new_address")).show()# 注册方式二:UDF返回值类型传值方式二dsl3_add_suffix = F.udf(add_suffix, 'string')# DSLinit_df.select("id","name","address",dsl3_add_suffix("address").alias("new_address")).show()# 注册方式三:语法糖/装饰器@F.udf(returnType=StringType())def add_suffix_candy(address):return address + "_itheima"# DSLinit_df.select("id","name","address",add_suffix_candy("address").alias("new_address")).show()# 4- 数据输出# 5- 释放资源spark.stop()
运行结果截图:
可能遇到的问题:
原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。
自定义演示二:
自定义UDF函数,让其返回值类型为复杂类型: 字典 列表 元组
1- 返回列表和元组,表现一致
2- 返回字典。需要注意,字典中的key需要和schema中的字段名保持一致,否则获取不到值,以null填充
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
from pyspark.sql.types import StringType, StructTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_udf_compaxtype')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.createDataFrame(data=[(1,'张三 广州'),(2,'李四 深圳')],schema='id int,name_address string')init_df.printSchema()init_df.show()init_df.createTempView('tmp')# 3- 数据处理# 3.1- 创建自定义的Python函数def my_split(name_address):data_list = name_address.split(' ')# 返回列表# return data_list# 返回元组。与列表表现一致# return tuple(data_list)# 返回字典。需要注意,字典中的key需要和schema中的字段名保持一致,否则获取不到值,以null填充# return {"n": data_list[0], "a": data_list[1]}return {"name":data_list[0],"address":data_list[1]}# 3.2- 将Python函数注册到Spark SQLschema = StructType().add("name",StringType()).add("address",StringType())dsl_my_split = spark.udf.register('sql_my_split',my_split,schema)# 3.3- 在SQL/DSL中调用# SQL方式sql_result = spark.sql("""selectid,name_address,sql_my_split(name_address) as new_field,sql_my_split(name_address)['name'] as name,sql_my_split(name_address)['address'] as addressfrom tmp""")sql_result.printSchema()sql_result.show()# DSL方式init_df.select("id","name_address",dsl_my_split("name_address").alias("new_field"),dsl_my_split("name_address")['name'].alias("name"),dsl_my_split("name_address")['address'].alias("address")).show()# 4- 数据输出# 5- 释放资源spark.stop()
运行结果截图:
01_数据清洗的dropDuplicates函数.py
dropDuplicates 行来看,保留第一个重复值,有subset也是
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 先读取数据生成df对象df = spark.read.csv(path='file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv',sep=',',header=True)# 测试是否有数据print(df.count())df.show()print('-------------------------------------------------------------')# 对df数据清洗# dropDuplicates(): 删除重复数据,默认整行一样就删除df1 = df.dropDuplicates(subset=['id','name'])# 测试数据是否清洗print(df1.count())df1.show()print('-------------------------------------------------------------')# dropDuplicates(): 删除重复数据,默认整行一样就删除# 参数subset: 指定删除哪几列数据重复,默认整行一样就删除# 测试数据是否清洗# 注意: 最后一定释放资源sc.stop()spark.stop()### dropDuplicates ==行来看,保留第一个重复值,有subset也是==
结果
02_数据清洗的dropna函数.py
dropna 行来看,how默认=any行中有空就删行,how='all’行中全部都是空才删行,subset=[‘name’]列中有空删行,thresh=3最少有三个 正常值(非空值)
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 先读取数据生成df对象# 注意: 如果读取的文件首行是表头,可以使用header=True,把首行作为表头使用df = spark.read.csv(path='file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv',sep=',',header=True)# 测试是否有数据print(df.count())df.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行df1 = df.dropna()# 测试是否数据被清洗print(df1.count())df1.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行# how参数: how默认=any行中有空就删行,how='all'行中全部都是空才删行df2 = df.dropna(how='all')# 测试是否数据被清洗print(df2.count())df2.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行# subset参数: 指定哪几列有空值就删除对应行df3 = df.dropna(subset=['name'])# 测试是否数据被清洗print(df3.count())df3.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行# thresh参数: 表示至少有thresh个非空值才保留,默认是1df3 = df.dropna(thresh=3)# 测试是否数据被清洗print(df3.count())df3.show()# 注意: 最后一定释放资源sc.stop()spark.stop()### dropna ==行来看,how默认=any行中有空就删行,how='all'行中全部都是空才删行,subset=['name']列中有空删行,thresh=3最少有三个 正常值(非空值)==
结果
03_数据清洗的fillna函数.py
fillna 列来看,value用指定值填充空值
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 读取文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)df.show()print('--------------------------------------------------------------------------------')# fillna(): 默认用指定值填充所有空值# value: 制定单值df2 = df.fillna(value='未知')df2.show()print('--------------------------------------------------------------------------------')# value: 制定单值# subset: 指定哪几列有空值就填充对应列df3 = df.fillna(value='未知', subset=['name', 'age'])df3.show()print('--------------------------------------------------------------------------------')# value: 制定字典(每个字段都能单独指定值)# 通过字典方式填充空值,k就是列名,v就是要填充的值df4 = df.fillna(value={'name': '未知', 'age': 0, 'address': '深圳'})df4.show()# 注意: 最后一定释放资源sc.stop()spark.stop()### fillna ==列来看,value用指定值填充空值==
结果
04_清洗后数据存储到文件.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 读取原始未清洗文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)# 清洗df数据etl_df = df.dropDuplicates().fillna('未知')# 输出清洗后的数据 方式1: 各种文件 方式2: 数据库# 先演示 输出到文件# 注意: 路径可以hdfs可以linux本地,分隔符也可以自定义etl_df.write.csv(path='hdfs://node1:8020/output',sep='\001',header=True,mode='overwrite',encoding='utf-8')data = "John\00125\001Male\001New York"fields = data.split('\001')print(fields) # 输出:['John', '25', 'Male', 'New York']# 注意: 最后一定释放资源sc.stop()spark.stop()
结果
05_清洗后数据存储到mysql数据库.py
# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 读取原始未清洗文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)# 清洗df数据etl_df = df.dropDuplicates().fillna('未知')# 输出清洗后的数据 方式1: 各种文件 方式2: 数据库# 再演示 输出到数据库# 注意: 输出到数据库,需要先在spark中配置jdbc驱动,否则会报错etl_df.write.jdbc(url="jdbc:mysql://node1:3306/数据库名字?useUnicode=true&characterEncoding=utf-8",table="clear_data_etl_tb2",mode="append",properties={"user": "root","password": "123456"})###############################################################################################etl_df.write.jdbc(url="jdbc:mysql://{host}:{port}/{db_name}?useUnicode=true&characterEncoding=utf-8",table="表名", # 替换表名,例如 "clear_data_etl_tb2"mode="模式", # 替换模式,例如 "append" 或 "overwrite"properties={"user": "用户", # 替换用户名,例如 "root""password": "密码" # 替换密码,例如 "123456"})################################################################################################# 注意: 最后一定释放资源sc.stop()spark.stop()
结果
07_清洗后数据存储到文件_优化分区数.py
# 导包
import os
import timefrom pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# TODO: 任务前: 修改saprkSQL的分区数spark.conf.set("spark.sql.shuffle.partitions", '1')# TODO: 获取SparkSQL的分区数print(spark.conf.get("spark.sql.shuffle.partitions"))# 读取原始未清洗文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)# 清洗df数据etl_df = df.dropDuplicates().fillna('未知')# 输出清洗后的数据 方式1: 各种文件 方式2: 数据库# 先演示 输出到文件# 注意: path路径可以hdfs可以linux本地,sep分隔符也可以自定义etl_df.write.csv(path='hdfs://node1:8020/output1',sep='\001',header=True,mode='overwrite',encoding='utf-8')# 为了查看分区效果,可以先让程序休息会儿time.sleep(500)# 注意: 最后一定释放资源spark.stop()
结果
08_电影数据分析案例.py
# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def get_user_avg_score():"""需求一: 统计每个用户的平均评分"""# 方式1: SQL风格spark.sql("""select user_id,round(avg(score),2) as avg_scorefrom movie_tbgroup by user_id""").show()# 方式2: DSL风格movie_df.groupby("user_id").agg(F.round(F.avg("score"), 2).alias("avg_score")).show()def get_big_avg_score_movie_cnt():"""需求: 获取大于平均分电影数量1- 统计所有打分的平均分,这个结果就是一个数字2- 统计每部电影各自的平均分3- 查询大于平均分的电影的数量"""# 方式1: SQL风格spark.sql("""with t as (select movie_id,avg(score) from movie_tb group by movie_id having avg(score) > (select avg(score) as all_movie_avg_score from movie_tb))select count(1) as cnt from t""").show()# 方式2: DSL风格movie_df.groupby("movie_id").agg(F.avg("score").alias("movie_avg_score")).where(F.col("movie_avg_score") > movie_df.select(F.avg("score").alias("all_movie_avg_score")).first()['all_movie_avg_score']).agg(F.count('movie_avg_score').alias("cnt")).show()def get_big_score_top1_user_avg_score_sql():"""需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户3- 统计该用户所有打分记录的平均分"""# 1 - 筛选出高分电影。统计每部电影的平均分,再过滤出 > 3分的电影信息spark.sql("""select movie_id,avg(score) as movie_avg_scorefrom movie_tbgroup by movie_idhaving movie_avg_score > 3""").createTempView("high_score_movie_tb")# 2 - 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户(不考虑并列)spark.sql("""select user_id,count(*) as cntfrom high_score_movie_tb hjoin movie_tb m on h.movie_id = m.movie_idgroup by user_idorder by cnt desclimit 1""").createTempView("top1_user_tb")# 3 - 统计该用户所有打分记录的平均分spark.sql("""select user_id,round(avg(score),2) as avg_scorefrom movie_tb where user_id = (select user_id from top1_user_tb)group by user_id""").show()def get_big_score_top1_user_avg_score_dsl():"""需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户3- 统计该用户所有打分记录的平均分"""# 1 - 筛选出高分电影。统计每部电影的平均分,再过滤出 > 3分的电影信息high_score_movie_df = movie_df.groupby("movie_id").agg(F.avg("score").alias('movie_avg_score')).where(F.col("movie_avg_score") > 3)# 2 - 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户top1_user_df = high_score_movie_df.join(movie_df, on="movie_id", how="inner").groupby("user_id").agg(F.count("user_id").alias("cnt")).orderBy("cnt", ascending=False).limit(1)# 3 - 统计该用户所有打分记录的平均分movie_df.where(F.col("user_id") == top1_user_df.first()['user_id']).groupby("user_id").agg(F.round(F.avg("score"), 2).alias("avg_score")).show()if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.数据收集df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/u.data",sep="\t",schema="user_id string, movie_id string, score int, dt string")df.show()df.printSchema()print(df.count())# 2.数据处理movie_df = df.dropDuplicates().dropna()print(movie_df.count())# 3.数据分析# 因为多个需求都会用到临时表,所以提前创建临时表movie_df.createTempView("movie_tb")# 需求一: 统计每个用户的平均评分# get_user_avg_score()# 需求三: 查询大于平均分的电影的数量# get_big_avg_score_movie_cnt()# 需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少# get_big_score_top1_user_avg_score_sql()# get_big_score_top1_user_avg_score_dsl()# 4.数据可视化# 可视化本次略# 注意: 最后一定释放资源spark.stop()
09_sparkSQL中应用开窗函数.py
# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 数据收集df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/cookie.txt",sep=',',schema="cid string,dt string,pv int")df.show()# 数据处理etldf = df.dropDuplicates().fillna('未知')# 数据分析# 需求: 统计每个cookie的top3的pv值# 方式1: SQL方式etldf.createTempView("cookie_tb")spark.sql("""with t as (select cid,dt,pv,row_number() over(partition by cid order by pv desc) as rnfrom cookie_tb)select * from t where rn <=3""").show()# 方式2: dsl方式etldf.select("cid","dt","pv",F.row_number().over(Window.partitionBy("cid").orderBy(F.desc("pv"))).alias("rn")).where("rn <= 3").show()# 数据可视化: 略# 注意: 最后一定释放资源sc.stop()spark.stop()