Spark 7:Spark SQL 函数定义

SparkSQL 定义UDF函数

8e9d7ddcc2f04a4fa3d0b7e78921b4f8.png

21cc4f7f05aa4c4d8e9db15c1d271cb1.png

方式1语法:
udf对象 = sparksession.udf.register(参数1,参数2,参数3)
参数1:UDF名称,可用于SQL风格
参数2:被注册成UDF的方法名
参数3:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
方式2语法:
udf对象 = F.udf(参数1, 参数2)
参数1:被注册成UDF的方法名
参数2:声明UDF的返回值类型
udf对象: 返回值对象,是一个UDF对象,可用于DSL风格
其中F是:
from pyspark.sql import functions as F
其中,被注册成UDF的方法名是指具体的计算方法,如:
def add(x, y): x + y
add就是将要被注册成UDF的方法名

8dee25a8da254eec809eb0c8a5d95baf.png  

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7]).map(lambda x:[x])df = rdd.toDF(["num"])# TODO 1: 方式1 sparksession.udf.register(), DSL和SQL风格均可以使用# UDF的处理函数def num_ride_10(num):return num * 10# 参数1: 注册的UDF的名称, 这个udf名称, 仅可以用于 SQL风格# 参数2: UDF的处理逻辑, 是一个单独的方法# 参数3: 声明UDF的返回值类型, 注意: UDF注册时候, 必须声明返回值类型, 并且UDF的真实返回值一定要和声明的返回值一致# 返回值对象: 这是一个UDF对象, 仅可以用于 DSL 语法# 当前这种方式定义的UDF, 可以通过参数1的名称用于SQL风格, 通过返回值对象用户DSL风格udf2 = spark.udf.register("udf1", num_ride_10, IntegerType())# SQL风格中使用# selectExpr 以SELECT的表达式执行, 表达式 SQL风格的表达式(字符串)# select方法, 接受普通的字符串字段名, 或者返回值是Column对象的计算df.selectExpr("udf1(num)").show()# DSL 风格中使用# 返回值UDF对象 如果作为方法使用, 传入的参数 一定是Column对象df.select(udf2(df['num'])).show()# TODO 2: 方式2注册, 仅能用于DSL风格udf3 = F.udf(num_ride_10, IntegerType())df.select(udf3(df['num'])).show()df.selectExpr("udf3(num)").show()

2562d46c19ee47c3b20f47fea545fb36.png

af9b0504fc474fd8896cb20f93094922.png 

# coding:utf8
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 构建一个RDDrdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])df = rdd.toDF(["line"])# 注册UDF, UDF的执行函数定义def split_line(data):return data.split(" ")  # 返回值是一个Array对象# TODO1 方式1 构建UDFudf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))# DLS风格df.select(udf2(df['line'])).show()# SQL风格df.createTempView("lines")spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)# TODO 2 方式2的形式构建UDFudf3 = F.udf(split_line, ArrayType(StringType()))df.select(udf3(df['line'])).show(truncate=False)

0e2c268c421848cfbc639a80fc067838.png

fe0861021c4d4be5a8b0566697bf5905.png 

# coding:utf8
import string
import timefrom pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':# 0. 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName("test").\master("local[*]").\config("spark.sql.shuffle.partitions", 2).\getOrCreate()sc = spark.sparkContext# 假设 有三个数字  1 2 3  我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回# 比如传入1 我们返回 {"num":1, "letters": "a"}rdd = sc.parallelize([[1], [2], [3]])df = rdd.toDF(["num"])# 注册UDFdef process(data):return {"num": data, "letters": string.ascii_letters[data]}"""UDF的返回值是字典的话, 需要用StructType来接收"""udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\add("letters", StringType(), nullable=True))df.selectExpr("udf1(num)").show(truncate=False)df.select(udf1(df['num'])).show(truncate=False)

2034e0d567664867a1c76c153620c99e.png  

c5a05c06e0c24009abe4b20cdc81ed7e.png

 SparkSQL 使用窗口函数

f3cf9802e8084fc4bf2518765b500397.png

7d43ea9fba3c479094ff69bb62cd99da.png

# coding:utf8
# 演示sparksql 窗口函数(开窗函数)
import string
from pyspark.sql import SparkSession
# 导入StructType对象
from pyspark.sql.types import ArrayType, StringType, StructType, IntegerType
import pandas as pd
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder. \appName("create df"). \master("local[*]"). \config("spark.sql.shuffle.partitions", "2"). \getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([('张三', 'class_1', 99),('王五', 'class_2', 35),('王三', 'class_3', 57),('王久', 'class_4', 12),('王丽', 'class_5', 99),('王娟', 'class_1', 90),('王军', 'class_2', 91),('王俊', 'class_3', 33),('王君', 'class_4', 55),('王珺', 'class_5', 66),('郑颖', 'class_1', 11),('郑辉', 'class_2', 33),('张丽', 'class_3', 36),('张张', 'class_4', 79),('黄凯', 'class_5', 90),('黄开', 'class_1', 90),('黄恺', 'class_2', 90),('王凯', 'class_3', 11),('王凯杰', 'class_1', 11),('王开杰', 'class_2', 3),('王景亮', 'class_3', 99)
])
schema = StructType().add("name", StringType()). \add("class", StringType()). \add("score", IntegerType())
df = rdd.toDF(schema)
# 窗口函数只用于SQL风格, 所以注册表先
df.createTempView("stu")
# TODO 聚合窗口
spark.sql("""
SELECT *, AVG(score) OVER() AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER() AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu
# 两个SQL的结果集进行整合而来
spark.sql("""
SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu
""").show()
# SELECT *, AVG(score) OVER(PARTITION BY class) AS avg_score FROM stu 等同于
# SELECT * FROM stu
# SELECT AVG(score) FROM stu GROUP BY class
# 两个SQL的结果集进行整合而来
# TODO 排序窗口
spark.sql("""
SELECT *, ROW_NUMBER() OVER(ORDER BY score DESC) AS row_number_rank, 
DENSE_RANK() OVER(PARTITION BY class ORDER BY score DESC) AS dense_rank, 
RANK() OVER(ORDER BY score) AS rank
FROM stu
""").show()
# TODO NTILE
spark.sql("""
SELECT *, NTILE(6) OVER(ORDER BY score DESC) FROM stu
""").show()

SparkSQL支持UDF和UDAF定义,但在Python中,暂时只能定义UDF
UDF定义支持2种方式, 1:使用SparkSession对象构建. 2: 使用functions包中提供的UDF API构建. 要注意, 方式1可用DSL和SQL风格, 方式2 仅可用于DSL风格
SparkSQL支持窗口函数使用, 常用SQL中的窗口函数均支持, 如聚合窗口\排序窗口\NTILE分组窗口等

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/107235.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot+WebSocket搭建多人在线聊天环境

一、WebSocket是什么? WebSocket是在单个TCP连接上进行全双工通信的协议,可以在服务器和客户端之间建立双向通信通道。 WebSocket 首先与服务器建立常规 HTTP 连接,然后通过发送Upgrade标头将其升级为双向 WebSocket 连接。 WebSocket使得…

一个简单的web应用程序的创建

一个简单的web应用程序的创建 1、数据库设计与创建1.1、数据库系统1.2、Navicat Premium1.3、Power Designer2、使用maven创建SpringBoot项目2.1、配置maven2.2、安装idea2.3、使用idea创建maven项目2.4、根据需要配置pom.xml文件、配置项目启动相关的文件2.5、写SpringBoot项目…

darknet yolo make报错,缺少instance-segmenter.o的规则

文章目录 darknet yolo make报错,缺少instance-segmenter.o的规则报错原因解决办法新问题解决办法 补充g编译选项Makefile编译规则 darknet yolo make报错,缺少instance-segmenter.o的规则 报错原因 Makefile没有识别到对于instance-segmenter.o的编译…

匈牙利算法 in 二分图匹配

https://www.luogu.com.cn/problem/P3386 重新看这个算法,才发现自己没有理解。 左边的点轮流匹配,看是否能匹配成功。对右边的点进行记录是否尝试过 然后有空就进,别人能退的就进 遍历左部点: 尝试匹配过程:

使用 S3 生命周期精确管理对象生命周期

在亚马逊工作这些年,我发现 S3 的生命周期配置是管理对象生命周期的重要但复杂的工具。在这篇文章中,我将利用实战经验,深入剖析生命周期,从核心概念到实际应用。 亚马逊云科技开发者社区为开发者们提供全球的开发技术资源。这里有技术文档、开发案例、技术专栏、培训视频、活…

Redis初始以及安装

"梦却了无影踪,梦仍不曾改动" 初始Redis (1) Redis是什么? 要认识、学习一个软件,最重要的途径无一是去该软件的官方文档里瞅瞅、转悠转悠。 从官方文档的介绍中得知,Redis是一种工作于内存,…

文件上传漏洞-upload靶场1-2关 通过笔记(如何区分前段验证和后端验证)

文件上传漏洞-upload靶场1-2关 通过笔记(区分前段验证和后端验证) 前言 upload是一个文件上传的专用靶场,搭设也非常简单,只需要把相关源码文件放到apache的网站目录下即可使用,或者去github下载一键绿化包进行安装链…

基于SSM+vue框架的校园代购服务订单管理系统源码和论文

基于SSMvue框架的校园代购服务订单管理系统源码和论文070 开发工具:idea 数据库mysql5.7 数据库链接工具:navcat,小海豚等 技术:ssm 摘 要 在新发展的时代,众多的软件被开发出来,给用户带来了很大的选择余地&am…

git及GitHub的使用

文章目录 git在本地仓库的使用github使用创建仓库https协议连接(不推荐,现在用起来比较麻烦)ssh连接(推荐)git分支操作冲突处理忽略文件 git在本地仓库的使用 1.在目标目录下右键打开git bash here 2.创建用户名和邮箱(注: 下载完…

网络编程套接字(2): 简单的UDP网络程序

文章目录 网络编程套接字(2): 简单的UDP网络程序3. 简单的UDP网络程序3.1 服务端创建(1) 创建套接字(2) 绑定端口号(3) sockaddr_in结构体(4) 数据的接收与发送接收发送 3.2 客户端创建3.3 代码编写(1) v1_简单发送消息(2) v2_小写转大写(3) v3_模拟命令行解释器(4) v4_多线程版…

[论文阅读笔记26]Tracking Everything Everywhere All at Once

论文地址: 论文 代码地址: 代码 这是一篇效果极好的像素级跟踪的文章, 发表在ICCV2023, 可以非常好的应对遮挡等情形, 其根本的方法在于将2D点投影到一个伪3D(quasi-3D)空间, 然后再映射回去, 就可以在其他帧中得到稳定跟踪. 这篇文章的方法不是很好理解, 代码也刚开源, 做一…

兵力集中更容易进攻获胜

我兵力集中,敌兵力分散,进攻可胜 【安志强趣讲《孙子兵法》第21讲】 【原文】 进而不可御者,冲其虚也;退而不可追者,速而不可及也。 【趣讲白话】 进攻时,敌人无法抵御,那是攻击了敌人空虚的地方…

小程序input的placeholder不垂直居中的问题解决

input的placeholder不垂直居中&#xff0c;input设置高度后&#xff0c;使用line-height只能使输入的文字垂直居中&#xff0c;但是placeholder不会居中&#xff0c;反而会偏上。 首先placeholder样式自定义 有两种方法&#xff0c;第一种行内样式&#xff1a; <input ty…

大彩串口屏使用记录

写在最前面 屏幕型号 DC10600M070 IDE VisualTFT&#xff08;官方&#xff09; VSCode&#xff08;lua编程&#xff09; 用之前看一下官方那个1小时的视频教程就大概懂控件怎么用了&#xff0c;用官方的软件VisualTFT很简单 本文只是简单记录遇到的一些坑 lua编辑器 VisualTF…

uview ui 查看版号

版本查询2种方式 有两种方式可以查询到正在使用的uView的版本&#xff1a; // 通过console.log打印的形式 console.log(uni.$u.config.v);// 可以查阅uView的配置文件得知当前版本号&#xff0c;具体位置为&#xff1a; /uview-ui/libs/config/config.js

Linux centos7 bash编程(小练习)

一、打印九九乘法口诀 这一个for循环嵌套的小练习&#xff0c;难度不大。提供一种写法&#xff0c;供参考&#xff1a; #!/bin/bash # 文件名&#xff1a;99table.sh # 打印输出九九乘法口诀表 for i in {1..9} do for ((j1;j<$i;j)) do …

R-Meta分析核心技术教程

详情点击链接&#xff1a;全流程R-Meta分析核心技术教程 一&#xff0c;Meta分析的选题与检索 1、Meta分析的选题与文献检索 1)什么是Meta分析 2)Meta分析的选题策略 3)精确检索策略&#xff0c;如何检索全、检索准 4)文献的管理与清洗&#xff0c;如何制定文献纳入排除标准 …

【Git】测试持续集成——Git+Gitee+PyCharm

文章目录 概述一、使用Gitee1. 注册账号2. 绑定邮箱3. 新建仓库4. 查看项目地址 二、安装配置Git1. 下载安装包2. 校验是否安装成功。3. 配置Git4. Git命令5. Git实操 三、PyCharmGit1. 配置Git2. Clone项目3. 提交文件到服务器4. 从服务器拉取文件 概述 持续集成&#xff08;…

开源在物联网(IoT)中的应用

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

react解决死循环方法?

使用useeffect&#xff08;副作用&#xff09;方法结束这个操作 1、导入useeffect、useState 2、把下方代码写入&#xff1a;里面填写的是你要终止某个东西的代码 注意&#xff1a;不可不写&#xff0c;也可以写依赖或不写