day06_Spark SQL

文章目录

  • day06_Spark SQL课程笔记
    • 一、今日课程内容
    • 二、DataFrame详解(掌握)
      • 5.清洗相关的API
      • 6.Spark SQL的Shuffle分区设置
      • 7.数据写出操作
        • 写出到文件
        • 写出到数据库
    • 三、Spark SQL的综合案例(掌握)
      • 1、常见DSL代码整理
      • 2、电影分析案例
        • 需求说明:
        • 需求分析:
    • 四、Spark SQL函数定义
      • 1、窗口函数
      • 2、SQL函数分类
      • 3、Spark原生自定义UDF函数
  • 01_数据清洗的dropDuplicates函数.py
      • dropDuplicates ==行来看,保留第一个重复值,有subset也是==
    • 结果
  • 02_数据清洗的dropna函数.py
      • dropna ==行来看,how默认=any行中有空就删行,how='all'行中全部都是空才删行,subset=['name']列中有空删行,thresh=3最少有三个 正常值(非空值)==
    • 结果
  • 03_数据清洗的fillna函数.py
      • fillna ==列来看,value用指定值填充空值==
    • 结果
  • 04_清洗后数据存储到文件.py
    • 结果
  • 05_清洗后数据存储到mysql数据库.py
    • 结果
  • 07_清洗后数据存储到文件_优化分区数.py
    • 结果
  • 08_电影数据分析案例.py
  • 09_sparkSQL中应用开窗函数.py

day06_Spark SQL课程笔记

一、今日课程内容

  • 1- DataFrame详解(掌握)
  • 2- Spark SQL的综合案例(掌握)
  • 3- Spark SQL函数定义(掌握)

今日目的:掌握DataFrame详解

在这里插入图片描述

二、DataFrame详解(掌握)

5.清洗相关的API

API是什么

  1. 简单来说:API(应用程序编程接口)就像是“软件之间的翻译官”,定义了不同系统或组件之间如何交互和通信,让开发者能够更方便地使用外部功能或服务。

  2. 具体而言

    • 定义:API是一组预定义的函数、协议和工具,用于构建软件应用程序。它规定了如何请求服务、传递数据以及接收结果。
    • 类型
      • Web API:基于HTTP协议,用于Web服务之间的通信,如RESTful API、GraphQL。
      • 库API:编程语言提供的函数库,如Python的NumPy、Pandas。
      • 操作系统API:操作系统提供的接口,如Windows API、Linux系统调用。
    • 工作原理
      • 请求:客户端发送请求(如HTTP请求)到API。
      • 处理:服务器接收请求并处理。
      • 响应:服务器返回结果(如JSON数据)给客户端。
  3. 实际生产场景

    • 在移动应用中,使用Web API获取天气数据或支付服务。
    • 在数据分析中,使用Python库API(如Pandas)处理数据。
    • 在系统开发中,使用操作系统API管理文件或进程。
  4. 总之:API是软件开发的基石,通过标准化接口简化了系统间的交互,提高了开发效率和代码复用性。

总结:
1- dropDuplicates(subset):用来删除重复数据。1.1- 如果没有指定参数subset,那么要比对行中的所有字段内容,如果全部相同,就认为是重复数据,会被删除;1.2- 如果有指定参数subset,那么只比对subset中指定的字段范围内删除重复数据2- dropna(thresh,subset):删除缺失值数据.2.1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据2.2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内2.3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh个字段的值不为空,才不会被删除3- fillna(value,subset): 替换缺失值数据3.1- value: 必须要传递参数.是用来填充缺失值的,默认填充所有的缺失值3.1- subset: 如果有指定参数subset,那么只比对subset中指定的字段范围内替换注意:value最常用的是传递字典的形式

代码演示:

# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def demo1_dropDuplicates():# dropDuplicates: 如果有重复就会去重,保留第一个,当然也可以指定参考的列df.dropDuplicates().show()df.dropDuplicates(['name']).show()def demo2_dropna():# dropna: 默认去除带null每行数据,当然也可以指定参考的列df.dropna().show()df.dropna(thresh=2).show()df.dropna(thresh=1, subset=['id', 'name']).show()def demo3_fillna():# fillna: 默认只要带null就补充指定内容,当然也可以指定参考列df.fillna('空').show()df.fillna('空', subset=['address']).show()df.fillna({'name': '未知', 'address': '广州'}).show()# 创建main函数
if __name__ == '__main__':# 1.创建spark对象# appName:应用程序名称  master:提交模式# getOrCreate:在builder构建器中获取一个存在的SparkSession,如果不存在,则创建一个新的spark = SparkSession.builder.appName('sparksql_demo').master('local[*]').getOrCreate()# 2.通过read读取外部文件方式创建DF对象df = spark.read \.format('csv') \.option('header', True) \.load('file:///export/data/spark_project/spark_sql/data/clear_data.csv')# 3.show直接展示df.show()# 4.清洗数据# dropDuplicates: 如果有重复就会去重,保留第一个,当然也可以指定参考的列# demo1_dropDuplicates()print('------------------------------------------------------')# dropna: 默认去除带null每行数据,当然也可以指定参考的列# demo2_dropna()print('------------------------------------------------------')# fillna: 默认只要带null就补充指定内容,当然也可以指定参考列demo3_fillna()# 5.释放资源spark.stop()

6.Spark SQL的Shuffle分区设置

补充:

如果运行sparksql,发现Shuffle分区每次都是1,或者后续count_distinct找不到,那么是因为pyspark版本原因导致。解决办法如下:

1- 检查自己3台机器的pyspark版本是否是3.1.2版本

pip list | grep pyspark

2-如果不是3.1.2版本,那么先卸载pyspark

命令: pip uninstall pyspark

3- 再按照【Spark课程阶段_部署文档.doc】中重新安装3.1.2版本pyspark

命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark==3.1.2

​ Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行

​ Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小
在这里插入图片描述

如何调整shuffle分区数量呢? spark.sql.shuffle.partitions

方案一(不推荐):  直接修改spark的配置文件spark-defaults.conf。全局设置,默认值为200。设置为: spark.sql.shuffle.partitions     20方案二(常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候./spark-submit --conf "spark.sql.shuffle.partitions=20"方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除。优先级也是最高的。一般的使用场景是,当你的数据量未来不会发生太大的波动。
设置shuffle分区的数量方式1:SparkSession.builder.config('spark.sql.shuffle.partitions', 1)
设置shuffle分区的数量方式2:spark.conf.set("spark.sql.shuffle.partitions",'1')获取shuffle分区的数量:spark.conf.get("spark.sql.shuffle.partitions")
# 导包
import os
import time
from pyspark.sql import SparkSession, functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 7.1 TODO: 记录程序开始时间start = time.time()# 1.创建spark对象# appName:应用程序名称  master:提交模式# getOrCreate:在builder构建器中获取一个存在的SparkSession,如果不存在,则创建一个新的spark = SparkSession.builder \.config('spark.sql.shuffle.partitions', 1)\.appName('sparksql_demo')\.master('local[*]')\.getOrCreate()# 获取shuffle分区的数量shuffle_partitions = spark.conf.get("spark.sql.shuffle.partitions")print("Shuffle partitions:", shuffle_partitions)# 2.通过read读取外部文件方式创建DF对象df = spark.read \.format('text') \.schema('words string') \.load('file:///export/data/spark_project/spark_sql/data/data3.txt')print(type(df))# 需求: 从data3.txt读取所有单词,然后统计每个单词出现的次数# 3.SQL风格# 方式1: 使用子查询方式# 先创建临时视图,然后通过sql语句查询展示df.createTempView('words_tb')qdf = spark.sql("select words,count(1) as cnt from (select explode(split(words,' ')) as words from words_tb) t group by words")print(type(qdf))qdf.show()# 4.DSL风格# 方式2: 分组后用agg函数df.select(F.explode(F.split('words', ' ')).alias('words')).groupBy('words').agg(F.count('words').alias('cnt')).show()# 7.2 TODO:记录结束时间end = time.time()# 7.3 计算运行时间# 结论: 合理设置分区数,效率会提高!t = end - startprint(f"程序运行了{t}秒")# 6.TODO:为了方便查看web页面可以让程序多睡会儿# time.sleep(1000)# 5.释放资源spark.stop()

7.数据写出操作

写出到文件

统一的输出语法:
在这里插入图片描述

对应的简写API格式如下,以CSV为例:
init_df.write.csv(path='存储路径',mode='模式',header=True,sep='\001',encoding='UTF-8'
)
常用参数说明:1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统2- mode:当输出目录中文件已经存在的时候处理办法2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path 	file:xxx already exists.3- sep:字段间的分隔符4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True5- encoding:文件输出的编码方式
  • 演示1: 输出到文件中 json csv orc text …
# 导包
import os
from pyspark.sql import SparkSession# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()spark.conf.set("spark.sql.shuffle.partitions", '1')# 2.数据输入df = spark.read\.format('csv')\.option('header',True)\.load('file:///export/data/spark_project/spark_sql/data/clear_data.csv')# 3.数据处理(切分,转换,分组聚合)etldf = df.dropDuplicates().dropna()etldf.show()# 4.数据输出# 原始APIetldf.write\.format('csv')\.option('sep',',')\.option('header',True)\.mode('overwrite')\.save('file:///export/data/spark_project/spark_sql/data/output')# 简化APIetldf.write.csv(sep=',',header=True,mode='overwrite',path='file:///export/data/spark_project/spark_sql/data/output2')# 5.关闭资源spark.stop()
写出到数据库
  • 将结果数据基于JDBC方案, 输出到关系型数据库, 例如说: MySql
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("数据输出到数据库")# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions","1")\.appName('sparksql_database')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',sep=' ',encoding='UTF-8',header="True",inferSchema=True)# 3- 数据处理result = init_df.where('age>=20')# 4- 数据输出result.show()result.printSchema()# 数据输出到数据"""创建数据库命令:create database day06 character set utf8;"""result.write.jdbc(url='jdbc:mysql://node1:3306/day06?useUnicode=true&characterEncoding=utf-8',table='student',mode='append',sql={ 'user' : 'root', 'password' : '123456' })# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述

可能出现的错误一:
在这里插入图片描述

    etldf.write.jdbc(url='jdbc:mysql://node1:3306/库名',table='表名',mode='append',# 解决方法: 给密码123456加上引号sql={'user':'root','password':'123456'})

可能出现的错误 二:
在这里插入图片描述

原因:  缺少连接MySQL数据库的驱动
解决方法如下:
数据库的驱动包, 一般都是一些Jar包,放置【mysql-connector-java-5.1.41.jar】驱动包到以下位置:1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,目录位置: /export/server/spark/jars2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包目录位置: /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars  ....

可能出现的错误三:
在这里插入图片描述

在这里插入图片描述

原因:将中文输出到了数据表中
解决办法:
1- 数据库连接要加上:  ?useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码: character set utf8
    # 最终连接数据代码如下:etldf.write.jdbc(url='jdbc:mysql://node1:3306/spark_db1?useUnicode=true&characterEncoding=utf-8',table='student',mode='append',sql={'user':'root','password':'123456'})

三、Spark SQL的综合案例(掌握)

1、常见DSL代码整理

分类格式含义示例
API/方法select查询字段select(‘id1’, ‘id2’)
where对数据过滤where(‘avg_score>3’)
groupBy对数据分组groupBy(‘userid’)
orderBy对数据排序orderBy(‘cnt’, ascending=False)
limit取前几条数据orderBy(‘cnt’, ascending=False).limit(1)
agg聚合操作,里面可以写多个聚合表达式agg(F.round(F.avg(‘score’), 2).alias(‘avg_score’))
show打印数据init_df.show()
printSchema打印数据的schema信息,也就是元数据信息init_df.printSchema()
alias对字段取别名F.count(‘movieid’).alias(‘cnt’)
join关联2个DataFrameetl_df.join(avg_score_dsl_df, ‘movieid’)
withColumn基于目前的数据产生一个新列init_df.withColumn(‘word’,F.explode(F.split(‘value’, ’ ')))
dropDuplicates删除重复数据init_df.dropDuplicates(subset=[“id”,“name”])
dropna删除缺失值init_df.dropna(thresh=2,subset=[“name”,“age”,“address”])
fillna替换缺失值init_df.fillna(value={“id”:111,“name”:“未知姓名”,“age”:100,“address”:“北京”})
first取DataFrame中的第一行数据
over创建一个窗口列
窗口partitionBy对数据分区
orderBy对数据排序orderBy(F.desc(‘pv’))
函数row_number行号。从1开始编号
desc降序排序
avg计算均值
count计数
round保留小数位
col将字段包装成Column对象,一般用于对新列的包装
1- 什么使用使用select(),什么时候使用groupBy()+agg()/select()实现聚合?:如果不需要对数据分组,那么可以直接使用select()实现聚合;如果有分组操作,需要使用groupBy()+agg()/select(),推荐使用agg()2- first(): 如果某个DataFrame中只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较3- F.col(): 对于在计算过程中临时产生的字段,需要使用F.col()封装成Column对象,然后去使用
  • API/方法:是由DataFrame来调用
  • 函数:需要先通过import pyspark.sql.functions as F导入,使用F调用。Spark SQL内置提供的函数https://spark.apache.org/docs/3.1.2/api/sql/index.html
  • 窗口:需要先通过from pyspark.sql import Window导入

2、电影分析案例

需求说明:

数据集的介绍:
在这里插入图片描述

数据说明 :  userid,movieid,score,datestr字段的分隔符号为:  \t

在这里插入图片描述

需求分析:
  • 需求一: 查询用户平均分

    需求分析:

    维度:用户
    指标:平均分

  • 需求二: 查询每部电影的平均分(课后作业,将自己对需求的分析步骤以文字的形式放在代码中)

  • 需求三: 查询大于平均分的电影的数量

    需求分析:

    1- 统计所有打分的平均分,这个结果就是一个数字

    2- 统计每部电影各自的平均分

    3- 查询大于平均分的电影的数量

  • 需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少

    需求分析:

    1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息

    2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户

    3- 统计该用户所有打分记录的平均分

  • 需求五: 查询每个用户的平均打分, 最低打分, 最高打分(课后作业)

  • 需求六: 查询被评分超过100次的电影的平均分 排名 TOP10(课后作业)

一三四需求实现代码:

# 导包
import os
from pyspark.sql import SparkSession,functions as F# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def demo1_get_user_avg_score():# 方式1: SQL方式spark.sql('select userid,round(avg(score),3) as user_avg_score from movie group by userid').show()# 方式2: DSL方式etldf.groupBy('userid').agg(F.round(F.avg('score'), 3).alias('user_avg_score')).show()def demo2_get_lag_avg_movie_cnt():# 方式1: SQLspark.sql("""select count(1) as cnt from (select movieid,avg(score) as movie_avg_score from movie group by movieidhaving movie_avg_score > (select avg(score) as all_avg_score from movie)  ) t""").show()# 方式2: DSL# col():把临时结果作为新列使用   first():取第一个值etldf.groupBy('movieid').agg(F.avg('score').alias('movie_avg_score')).where(F.col('movie_avg_score') > etldf.select(F.avg('score').alias('all_avg_score')).first()['all_avg_score']).agg(F.count('movieid').alias('cnt')).show()def demo3_get_top1_user_avg_sql():# 方式1: SQL# ①先查询高分电影:spark.sql("select movieid,avg(score) as movie_avg_score from movie group by movieid having movie_avg_score > 3").createTempView('hight_score_tb')# ②再求打分次数最多的用户(先不考虑并列,只取最大1个)spark.sql("select userid,count(1) as cnt from hight_score_tb h join movie m on h.movieid = m.movieid group by userid order by cnt desc limit 1").createTempView('top1_user_tb')# ③最后求此人所有打分的平均分spark.sql("select avg(score) as top1_user_avg from movie where userid = (select userid from top1_user_tb)").show()def demo3_get_top1_user_avg_dsl():# ①先查询高分电影:hight_score_df = etldf.groupBy('movieid').agg(F.avg('score').alias('movie_avg_score')).where('movie_avg_score>3')# ②再求打分次数最多的用户(先不考虑并列,只取最大1个)top1_user_df = hight_score_df.join(etldf, on=hight_score_df['movieid'] == etldf['movieid']) \.groupBy('userid').agg(F.count('userid').alias('cnt')) \.orderBy('cnt', ascending=False).limit(1)# ③最后求此人所有打分的平均分etldf.where(etldf['userid'] == top1_user_df.first()['userid']).agg(F.avg('score').alias('top1_user_avg')).show()# 创建main函数
if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.数据输入df = spark.read.csv(schema='userid string,movieid string,score int,datestr string',sep='\t',path='file:///export/data/spark_project/spark_sql/data/u.data')print(df.count())# 3.数据处理(切分,转换,分组聚合)etldf = df.dropDuplicates().dropna()print(etldf.count())# 4.数据分析# 方便后续所有SQL方式使用,提前创建临时视图作为表etldf.createTempView('movie')# 需求1: 查询用户的平均分# demo1_get_user_avg_score()# 需求3: 查询大于平均分的电影的数量# demo2_get_lag_avg_movie_cnt()# 需求4: 查询高分电影(平均分>3)中,打分次数最多的用户,并求出此人所有打分的平均分# 方式1: SQLdemo3_get_top1_user_avg_sql()# 方式2: DSLdemo3_get_top1_user_avg_dsl()# 5.数据输出# 6.关闭资源spark.stop()

附录: 问题

可能出现的错误一:
在这里插入图片描述

原因: 是使用withColumn产生新列,但是表达式中有聚合的操作。缺少groupBy调用

可能出现的错误二:
在这里插入图片描述

错误原因:DataFrame结果是单行的情况,列值获取错误
在这里插入图片描述

解决办法:

将df_total_avg_score['total_avg_score']改成df_total_avg_score.first()['total_avg_score']

可能遇到的错误三:
在这里插入图片描述

原因:对于在计算过程中临时产生的字段,需要使用F.col封装成Column对象

解决办法:F.col(‘avg_score’)

四、Spark SQL函数定义

1、窗口函数

回顾之前学习过的窗口函数:

分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])分析函数可以大致分成如下3类:
1- 第一类: 聚合函数 sum() count() avg() max() min()
2- 第二类: row_number() rank() dense_rank() ntile()
3- 第三类: first_value() last_value() lead() lag()

在Spark SQL中使用窗口函数案例:

需求是找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config('spark.sql.shuffle.partitions',1)\.appName('sparksql_win_function')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.read.csv(path='file:///export/data/gz16_pyspark/02_spark_sql/data/cookie.txt',schema='cookie string,datestr string,pv int',sep=',',encoding='UTF-8')init_df.createTempView('win_data')init_df.show()init_df.printSchema()# 3- 数据处理# SQLspark.sql("""select cookie,datestr,pvfrom (selectcookie,datestr,pv,row_number() over (partition by cookie order by pv desc) as rnfrom win_data) tmp where rn<=3""").show()# DSL"""select:注意点,结果中需要看到哪几个字段,就要明确写出来"""init_df.select("cookie","datestr","pv",F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')).where('rn<=3').select("cookie","datestr","pv").show()# 4- 数据输出# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述

2、SQL函数分类

SQL函数,主要分为以下三大类:

  • UDF函数:用户自定义函数
    • 特点:一对一,输入一个得到一个
    • 例如:split() substr()
    • 在Hive中继承UDF类,方法名称为evaluate,返回值不能为void,其实就是实现一个方法;
  • UDAF函数:用户自定义聚合函数
    • 特点:多对一,输入多个得到一个
    • 例如:sum() avg() count() min()
  • UDTF函数:用户自定义表数据生成函数
    • 特点:一对多,输入一个得到多个
    • 例如:explode()
  1. 简单来说:SQL函数中的UDF、UDAF和UDTF就像是“数据处理的三剑客”,分别擅长“单点突破”、“团队协作”和“多点开花”,满足不同的数据处理需求。

  2. 具体而言

    • UDF(用户自定义函数)
      • 比喻:像是“单兵作战”,一对一处理数据。
      • 特点:输入一个值,输出一个值。
      • 示例split()(字符串分割)、substr()(字符串截取)。
      • 实现:在Hive中继承UDF类,实现evaluate方法。
    • UDAF(用户自定义聚合函数)
      • 比喻:像是“团队协作”,多对一聚合数据。
      • 特点:输入多行数据,输出一个聚合结果。
      • 示例sum()(求和)、avg()(平均值)、count()(计数)。
      • 实现:在Hive中继承UDAF类,实现init()iterate()merge()等方法。
    • UDTF(用户自定义表生成函数)
      • 比喻:像是“多点开花”,一对多展开数据。
      • 特点:输入一行数据,输出多行数据。
      • 示例explode()(将数组或Map展开为多行)。
      • 实现:在Hive中继承GenericUDTF类,实现initialize()process()等方法。
  3. 实际生产场景

    • 在数据清洗中,使用UDF对字符串进行格式化处理。
    • 在数据分析中,使用UDAF计算复杂的业务指标,如加权平均值。
    • 在数据展开中,使用UDTF将嵌套的JSON数据拆分为多行,便于后续分析。
  4. 总之:UDF、UDAF和UDTF是SQL中强大的扩展工具,分别适用于“一对一”、“多对一”和“一对多”的数据处理场景,为业务需求提供了灵活的支持。

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

思考:有这么多的内置函数,为啥还需要自定义函数呢?

为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数


在这里插入图片描述

1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件,Python可以开发UDF、UDAF函数

​ 在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。

​ 在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。

Spark SQL原生存在的问题:大量的序列化和反序列
在这里插入图片描述

虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

3、Spark原生自定义UDF函数

自定义函数流程:

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可第二步: 将Python函数注册到Spark SQL中注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用注意: 如果通过方式一来注册函数, 【可以用在SQL和DSL】注册方式二:  udf对象 = F.udf(参数1,参数2)参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用注意: 如果通过方式二来注册函数,【仅能用在DSL中】注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

自定义演示一:

请自定义一个函数,完成对数据统一添加一个后缀名的操作

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
from pyspark.sql.types import StringTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("请自定义一个函数,完成对数据统一添加一个后缀名的操作_itheima")# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_udf_basetype')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.createDataFrame(data=[(1,'张三','广州'),(2,'李四','深圳')],schema='id int,name string,address string')init_df.printSchema()init_df.show()init_df.createTempView('tmp')# 3- 数据处理# 3.1- 创建自定义的Python函数def add_suffix(address):return address + "_itheima"# 3.2- 将Python函数注册到Spark SQL# 注册方式一dsl_add_suffix = spark.udf.register('sql_add_suffix',add_suffix,StringType())# 3.3- 在SQL/DSL中调用# SQLspark.sql("""selectid,name,address,sql_add_suffix(address) as new_addressfrom tmp""").show()# DSLinit_df.select("id","name","address",dsl_add_suffix("address").alias("new_address")).show()print("-"*30)# 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。# spark.sql("""#     select#         id,name,address,#         dsl_add_suffix(address) as new_address#     from tmp# """).show()# 注册方式二:UDF返回值类型传值方式一dsl2_add_suffix = F.udf(add_suffix,StringType())# DSLinit_df.select("id","name","address",dsl2_add_suffix("address").alias("new_address")).show()# 注册方式二:UDF返回值类型传值方式二dsl3_add_suffix = F.udf(add_suffix, 'string')# DSLinit_df.select("id","name","address",dsl3_add_suffix("address").alias("new_address")).show()# 注册方式三:语法糖/装饰器@F.udf(returnType=StringType())def add_suffix_candy(address):return address + "_itheima"# DSLinit_df.select("id","name","address",add_suffix_candy("address").alias("new_address")).show()# 4- 数据输出# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述

可能遇到的问题:
在这里插入图片描述

原因: 在错误的地方调用了错误的函数。spark.udf.register参数1取的函数名只能在SQL中使用,不能在DSL中用。

自定义演示二:

自定义UDF函数,让其返回值类型为复杂类型: 字典 列表 元组

1- 返回列表和元组,表现一致
2- 返回字典。需要注意,字典中的key需要和schema中的字段名保持一致,否则获取不到值,以null填充
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
from pyspark.sql.types import StringType, StructTypeos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.config("spark.sql.shuffle.partitions",1)\.appName('sparksql_udf_compaxtype')\.master('local[*]')\.getOrCreate()# 2- 数据输入init_df = spark.createDataFrame(data=[(1,'张三 广州'),(2,'李四 深圳')],schema='id int,name_address string')init_df.printSchema()init_df.show()init_df.createTempView('tmp')# 3- 数据处理# 3.1- 创建自定义的Python函数def my_split(name_address):data_list = name_address.split(' ')# 返回列表# return data_list# 返回元组。与列表表现一致# return tuple(data_list)# 返回字典。需要注意,字典中的key需要和schema中的字段名保持一致,否则获取不到值,以null填充# return {"n": data_list[0], "a": data_list[1]}return {"name":data_list[0],"address":data_list[1]}# 3.2- 将Python函数注册到Spark SQLschema = StructType().add("name",StringType()).add("address",StringType())dsl_my_split = spark.udf.register('sql_my_split',my_split,schema)# 3.3- 在SQL/DSL中调用# SQL方式sql_result = spark.sql("""selectid,name_address,sql_my_split(name_address) as new_field,sql_my_split(name_address)['name'] as name,sql_my_split(name_address)['address'] as addressfrom tmp""")sql_result.printSchema()sql_result.show()# DSL方式init_df.select("id","name_address",dsl_my_split("name_address").alias("new_field"),dsl_my_split("name_address")['name'].alias("name"),dsl_my_split("name_address")['address'].alias("address")).show()# 4- 数据输出# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述

01_数据清洗的dropDuplicates函数.py

dropDuplicates 行来看,保留第一个重复值,有subset也是

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 先读取数据生成df对象df = spark.read.csv(path='file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv',sep=',',header=True)# 测试是否有数据print(df.count())df.show()print('-------------------------------------------------------------')# 对df数据清洗# dropDuplicates(): 删除重复数据,默认整行一样就删除df1 = df.dropDuplicates(subset=['id','name'])# 测试数据是否清洗print(df1.count())df1.show()print('-------------------------------------------------------------')# dropDuplicates(): 删除重复数据,默认整行一样就删除# 参数subset: 指定删除哪几列数据重复,默认整行一样就删除# 测试数据是否清洗# 注意: 最后一定释放资源sc.stop()spark.stop()### dropDuplicates ==行来看,保留第一个重复值,有subset也是==

在这里插入图片描述

结果

在这里插入图片描述

02_数据清洗的dropna函数.py

dropna 行来看,how默认=any行中有空就删行,how='all’行中全部都是空才删行,subset=[‘name’]列中有空删行,thresh=3最少有三个 正常值(非空值)

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 先读取数据生成df对象# 注意: 如果读取的文件首行是表头,可以使用header=True,把首行作为表头使用df = spark.read.csv(path='file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv',sep=',',header=True)# 测试是否有数据print(df.count())df.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行df1 = df.dropna()# 测试是否数据被清洗print(df1.count())df1.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行# how参数: how默认=any行中有空就删行,how='all'行中全部都是空才删行df2 = df.dropna(how='all')# 测试是否数据被清洗print(df2.count())df2.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行# subset参数: 指定哪几列有空值就删除对应行df3 = df.dropna(subset=['name'])# 测试是否数据被清洗print(df3.count())df3.show()print('-------------------------------------------------------------')# 对df数据清洗# dropna() : 默认删除有空值的行# thresh参数: 表示至少有thresh个非空值才保留,默认是1df3 = df.dropna(thresh=3)# 测试是否数据被清洗print(df3.count())df3.show()# 注意: 最后一定释放资源sc.stop()spark.stop()### dropna ==行来看,how默认=any行中有空就删行,how='all'行中全部都是空才删行,subset=['name']列中有空删行,thresh=3最少有三个 正常值(非空值)==

在这里插入图片描述

结果

在这里插入图片描述

03_数据清洗的fillna函数.py

fillna 列来看,value用指定值填充空值

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 读取文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)df.show()print('--------------------------------------------------------------------------------')# fillna(): 默认用指定值填充所有空值# value: 制定单值df2 = df.fillna(value='未知')df2.show()print('--------------------------------------------------------------------------------')# value: 制定单值# subset: 指定哪几列有空值就填充对应列df3 = df.fillna(value='未知', subset=['name', 'age'])df3.show()print('--------------------------------------------------------------------------------')# value: 制定字典(每个字段都能单独指定值)# 通过字典方式填充空值,k就是列名,v就是要填充的值df4 = df.fillna(value={'name': '未知', 'age': 0, 'address': '深圳'})df4.show()# 注意: 最后一定释放资源sc.stop()spark.stop()### fillna ==列来看,value用指定值填充空值==

在这里插入图片描述

结果

在这里插入图片描述

04_清洗后数据存储到文件.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 读取原始未清洗文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)# 清洗df数据etl_df = df.dropDuplicates().fillna('未知')# 输出清洗后的数据 方式1: 各种文件   方式2: 数据库# 先演示 输出到文件# 注意: 路径可以hdfs可以linux本地,分隔符也可以自定义etl_df.write.csv(path='hdfs://node1:8020/output',sep='\001',header=True,mode='overwrite',encoding='utf-8')data = "John\00125\001Male\001New York"fields = data.split('\001')print(fields)  # 输出:['John', '25', 'Male', 'New York']# 注意: 最后一定释放资源sc.stop()spark.stop()

结果

在这里插入图片描述

在这里插入图片描述

05_清洗后数据存储到mysql数据库.py

# 导包
import os
from pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 读取原始未清洗文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)# 清洗df数据etl_df = df.dropDuplicates().fillna('未知')# 输出清洗后的数据 方式1: 各种文件   方式2: 数据库# 再演示 输出到数据库# 注意: 输出到数据库,需要先在spark中配置jdbc驱动,否则会报错etl_df.write.jdbc(url="jdbc:mysql://node1:3306/数据库名字?useUnicode=true&characterEncoding=utf-8",table="clear_data_etl_tb2",mode="append",properties={"user": "root","password": "123456"})###############################################################################################etl_df.write.jdbc(url="jdbc:mysql://{host}:{port}/{db_name}?useUnicode=true&characterEncoding=utf-8",table="表名",  # 替换表名,例如 "clear_data_etl_tb2"mode="模式",  # 替换模式,例如 "append" 或 "overwrite"properties={"user": "用户",  # 替换用户名,例如 "root""password": "密码"  # 替换密码,例如 "123456"})################################################################################################# 注意: 最后一定释放资源sc.stop()spark.stop()

结果

在这里插入图片描述

07_清洗后数据存储到文件_优化分区数.py

# 导包
import os
import timefrom pyspark.sql import SparkSession# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# TODO: 任务前: 修改saprkSQL的分区数spark.conf.set("spark.sql.shuffle.partitions", '1')# TODO: 获取SparkSQL的分区数print(spark.conf.get("spark.sql.shuffle.partitions"))# 读取原始未清洗文件生成df对象df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/clear_data.csv",sep=',',header=True)# 清洗df数据etl_df = df.dropDuplicates().fillna('未知')# 输出清洗后的数据 方式1: 各种文件   方式2: 数据库# 先演示 输出到文件# 注意: path路径可以hdfs可以linux本地,sep分隔符也可以自定义etl_df.write.csv(path='hdfs://node1:8020/output1',sep='\001',header=True,mode='overwrite',encoding='utf-8')# 为了查看分区效果,可以先让程序休息会儿time.sleep(500)# 注意: 最后一定释放资源spark.stop()

结果

在这里插入图片描述
在这里插入图片描述

08_电影数据分析案例.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'def get_user_avg_score():"""需求一: 统计每个用户的平均评分"""# 方式1: SQL风格spark.sql("""select user_id,round(avg(score),2) as avg_scorefrom movie_tbgroup by user_id""").show()# 方式2: DSL风格movie_df.groupby("user_id").agg(F.round(F.avg("score"), 2).alias("avg_score")).show()def get_big_avg_score_movie_cnt():"""需求: 获取大于平均分电影数量1- 统计所有打分的平均分,这个结果就是一个数字2- 统计每部电影各自的平均分3- 查询大于平均分的电影的数量"""# 方式1: SQL风格spark.sql("""with t as (select movie_id,avg(score) from movie_tb group by movie_id having avg(score) > (select avg(score) as all_movie_avg_score from movie_tb))select count(1) as cnt from t""").show()# 方式2: DSL风格movie_df.groupby("movie_id").agg(F.avg("score").alias("movie_avg_score")).where(F.col("movie_avg_score") > movie_df.select(F.avg("score").alias("all_movie_avg_score")).first()['all_movie_avg_score']).agg(F.count('movie_avg_score').alias("cnt")).show()def get_big_score_top1_user_avg_score_sql():"""需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户3- 统计该用户所有打分记录的平均分"""# 1 - 筛选出高分电影。统计每部电影的平均分,再过滤出 > 3分的电影信息spark.sql("""select movie_id,avg(score) as movie_avg_scorefrom movie_tbgroup by movie_idhaving movie_avg_score > 3""").createTempView("high_score_movie_tb")# 2 - 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户(不考虑并列)spark.sql("""select user_id,count(*) as cntfrom high_score_movie_tb hjoin movie_tb m on h.movie_id = m.movie_idgroup by user_idorder by cnt desclimit 1""").createTempView("top1_user_tb")# 3 - 统计该用户所有打分记录的平均分spark.sql("""select user_id,round(avg(score),2) as avg_scorefrom movie_tb where user_id = (select user_id from top1_user_tb)group by user_id""").show()def get_big_score_top1_user_avg_score_dsl():"""需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少1- 筛选出高分电影。统计每部电影的平均分,再过滤出>3分的电影信息2- 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户3- 统计该用户所有打分记录的平均分"""# 1 - 筛选出高分电影。统计每部电影的平均分,再过滤出 > 3分的电影信息high_score_movie_df = movie_df.groupby("movie_id").agg(F.avg("score").alias('movie_avg_score')).where(F.col("movie_avg_score") > 3)# 2 - 从高分电影中,统计每个用户分别打了多少次分。再选择TOP1的用户top1_user_df = high_score_movie_df.join(movie_df, on="movie_id", how="inner").groupby("user_id").agg(F.count("user_id").alias("cnt")).orderBy("cnt", ascending=False).limit(1)# 3 - 统计该用户所有打分记录的平均分movie_df.where(F.col("user_id") == top1_user_df.first()['user_id']).groupby("user_id").agg(F.round(F.avg("score"), 2).alias("avg_score")).show()if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 1.数据收集df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/u.data",sep="\t",schema="user_id string, movie_id string, score int, dt string")df.show()df.printSchema()print(df.count())# 2.数据处理movie_df = df.dropDuplicates().dropna()print(movie_df.count())# 3.数据分析# 因为多个需求都会用到临时表,所以提前创建临时表movie_df.createTempView("movie_tb")# 需求一: 统计每个用户的平均评分# get_user_avg_score()# 需求三: 查询大于平均分的电影的数量# get_big_avg_score_movie_cnt()# 需求四: 查询高分电影中(电影平均分大于3)打分次数最多的用户, 并且求出此人所有的打分记录中, 打的平均分是多少# get_big_score_top1_user_avg_score_sql()# get_big_score_top1_user_avg_score_dsl()# 4.数据可视化# 可视化本次略# 注意: 最后一定释放资源spark.stop()

09_sparkSQL中应用开窗函数.py

# 导包
import os
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window# 解决JAVA_HOME 未设置问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 先创建spark session对象spark = SparkSession.builder.appName("spark_demo").master("local[*]").getOrCreate()# 注意: 如果操作sparksql直接用spark对象即可,如果还有操作rdd的操作,需要获取sc对象sc = spark.sparkContext# 数据收集df = spark.read.csv(path="file:///export/data/spark_project/06_spark_sql_数据清洗_导出以及综合案例/resources/cookie.txt",sep=',',schema="cid string,dt string,pv int")df.show()# 数据处理etldf = df.dropDuplicates().fillna('未知')# 数据分析# 需求: 统计每个cookie的top3的pv值# 方式1: SQL方式etldf.createTempView("cookie_tb")spark.sql("""with t as (select cid,dt,pv,row_number() over(partition by cid order by pv desc) as rnfrom cookie_tb)select * from t where rn <=3""").show()# 方式2: dsl方式etldf.select("cid","dt","pv",F.row_number().over(Window.partitionBy("cid").orderBy(F.desc("pv"))).alias("rn")).where("rn <= 3").show()# 数据可视化: 略# 注意: 最后一定释放资源sc.stop()spark.stop()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/1016.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Copula算法原理和R语言股市收益率相依性可视化分析

阅读全文&#xff1a;http://tecdat.cn/?p6193 copula是将多变量分布函数与其边缘分布函数耦合的函数&#xff0c;通常称为边缘。在本视频中&#xff0c;我们通过可视化的方式直观地介绍了Copula函数&#xff0c;并通过R软件应用于金融时间序列数据来理解它&#xff08;点击文…

Spring Boot 支持哪些日志框架

Spring Boot 支持多种日志框架&#xff0c;主要包括以下几种&#xff1a; SLF4J (Simple Logging Facade for Java) Logback&#xff08;默认&#xff09;Log4j 2Java Util Logging (JUL) 其中&#xff0c;Spring Boot 默认使用 SLF4J 和 Logback 作为日志框架。如果你需要使…

OpenCV基础:视频的采集、读取与录制

从摄像头采集视频 相关接口 - VideoCapture VideoCapture 用于从视频文件、摄像头或其他视频流设备中读取视频帧。它可以捕捉来自多种源的视频。 主要参数&#xff1a; cv2.VideoCapture(source): source: 这是一个整数或字符串&#xff0c;表示视频的来源。 如果是整数&a…

Uniapp仿ChatGPT Stream流式输出(非Websocket)

Uniapp仿ChatGPT Stream流式输出&#xff08;非Websocket&#xff09; 前言&#xff1a;流式输出可以使用websocket也可以使用stream来实现EventSource是 HTML5 中的一个接口&#xff0c;用于接收服务器发送的事件流&#xff08;Server - Sent Events&#xff0c;SSE&#xff…

《自动驾驶与机器人中的SLAM技术》ch2:基础数学知识

目录 2.1 几何学 向量的内积和外积 旋转矩阵 旋转向量 四元数 李群和李代数 SO(3)上的 BCH 线性近似式 2.2 运动学 李群视角下的运动学 SO(3) t 上的运动学 线速度和加速度 扰动模型和雅可比矩阵 典型算例&#xff1a;对向量进行旋转 典型算例&#xff1a;旋转的复合 2.3 …

深入 Flutter 和 Compose 在 UI 渲染刷新时 Diff 实现对比

众所周知&#xff0c;不管是什么框架&#xff0c;在前端 UI 渲染时&#xff0c;都会有构造出一套相关的渲染树&#xff0c;并且在 UI 更新时&#xff0c;为了尽可能提高性能&#xff0c;一般都只会进行「差异化」更新&#xff0c;而不是对整个 UI Tree 进行刷新&#xff0c;所以…

Elasticsearch—索引库操作(增删查改)

Elasticsearch中Index就相当于MySQL中的数据库表 Mapping映射就类似表的结构。 因此我们想要向Elasticsearch中存储数据,必须先创建Index和Mapping 1. Mapping映射属性 Mapping是对索引库中文档的约束&#xff0c;常见的Mapping属性包括&#xff1a; type&#xff1a;字段数据类…

occ的开发框架

occ的开发框架 1.Introduction This manual explains how to use the Open CASCADE Application Framework (OCAF). It provides basic documentation on using OCAF. 2.Purpose of OCAF OCAF (the Open CASCADE Application Framework) is an easy-to-use platform for ra…

esp32在编译是报错在idf中有该文件,但是说没有

报错没有头文件esp_efuse_table.h D:/Espressif/frameworks/esp-idf-v5.3.1/components/driver/deprecated/driver/i2s.h:27:2: warning: #warning "This set of I2S APIs has been deprecated, please include driver/i2s_std.h, driver/i2s_pdm.h or driver/i2s_tdm.h …

git - 用SSH方式迁出远端git库

文章目录 git - 用SSH方式迁出远端git库概述笔记以gitee为例产生RSA密钥对 备注githubEND git - 用SSH方式迁出远端git库 概述 最近一段时间&#xff0c;在网络没问题的情况下&#xff0c;用git方式直接迁出git库总是会失败。 失败都是在远端, 显示RPC错误。 但是git服务器端…

http和https有哪些不同

http和https有哪些不同 1.数据传输的安全性&#xff1a;http非加密&#xff0c;https加密 2.端口号&#xff1a;http默认80端口&#xff0c;https默认443端口 3.性能&#xff1a;http基于tcp三次握手建立连接&#xff0c;https在tcp三次握手后还有TLS协议的四次握手确认加密…

超详细-java-uniapp小程序-引导关注公众号、判断用户是否关注公众号

目录 1、前期准备 公众号和小程序相互关联 准备公众号文章 注册公众号测试号 微信静默授权的独立html 文件 2&#xff1a; 小程序代码 webview页面代码 小程序首页代码 3&#xff1a;后端代码 1&#xff1a;增加公众号配置项 2&#xff1a;读取公众号配置项 3&…

【Python进阶——分布式计算框架pyspark】

Apache Spark是用于大规模数据处理的统一分析引擎 简单来说&#xff0c;Spark是一款分布式的计算框架&#xff0c;用于调度成百上千的服务器集群&#xff0c;计算TB、PB乃至EB级别的海量数据&#xff0c;Spark作为全球顶级的分布式计算框架&#xff0c;支持众多的编程语言进行开…

基于 FastExcel 与消息队列高效生成及导入机构用户数据

&#x1f3af; 本文档详细介绍了开发机构用户数据导入功能的必要性及实现方法&#xff0c;如针对教育机构如学校场景下提高用户体验和管理效率的需求。文中首先分析了直接对接学生管理系统与平台对接的优势&#xff0c;包括减少人工审核成本、提高身份验证准确性等。接着介绍了…

校园跑腿小程序---轮播图,导航栏开发

hello hello~ &#xff0c;这里是 code袁~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生…

前端练习题

图片&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html> <head><meta charset"UTF-8"><title>用户信息页面</title><style>body {font-family: Arial, sans-serif;margin: 20px;}.user-info {display: flex;align-it…

AllData是怎么样的一款数据中台产品?

&#x1f525;&#x1f525; AllData大数据产品是可定义数据中台&#xff0c;以数据平台为底座&#xff0c;以数据中台为桥梁&#xff0c;以机器学习平台为中层框架&#xff0c;以大模型应用为上游产品&#xff0c;提供全链路数字化解决方案。 ✨奥零数据科技官网&#xff1a;…

一学就废|Python基础碎片,OS模块

Python 中的操作系统模块提供了与操作系统交互的功能。操作系统属于 Python 的标准实用程序模块。该模块提供了一种使用依赖于操作系统的功能的可移植方式。os和os. path模块包括许多与文件系统交互的函数。 Python-OS 模块函数 我们将讨论 Python os 模块的一些重要功能&…

2.Numpy练习(1)

一.练习一&#xff1a; 1.打印当前numpy版本&#xff1a; 2.构造一个全零的矩阵&#xff0c;并打印其占用内存大小&#xff1a; 3.打印一个函数的帮助文档&#xff0c;比如numpy.add&#xff1a; 4.创建一个10~49数组&#xff0c;并将其倒序排列: 5.找到一个数组中不为0的索引…

Ubuntu Server挂载AWS S3成一个本地文件夹

2023年&#xff0c;AWS出了个mountpoint的工具&#xff1a; https://github.com/awslabs/mountpoint-s3 如下是另外一种方式&#xff0c;通过s3fs-fuse 这个工具 sudo apt-get install automake autotools-dev \fuse g git libcurl4-gnutls-dev libfuse-dev \libssl-dev libx…