【大数据学习 | Spark-SQL】定义UDF和DUAF,UDTF函数

1. UDF函数(用户自定义函数)

一般指的是用户自己定义的单行函数。一进一出,函数接受的是一行中的一个或者多个字段值,返回一个值。比如MySQL中的,日期相关的dateDiff函数,字符串相关的substring函数。

先准备数据:

1.1 导入必要的包

首先,确保导入必要的Spark包:

import org.apache.spark.sql.SparkSession

1.2 创建SparkSession

创建一个SparkSession对象,这是与Spark交互的入口。

1.3 定义UDF并注册到SparkSQL

定义一个Scala函数,并将其注册为UDF。示例

1.4 使用UDF在SQL查询中:

调用udf的register方法,第一个参数是udf函数的函数名,第二个参数是要注册为UDF的函数。

session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})

1.5 代码:

尽量使用SparkSQL的sql形式的写法,api写法太麻烦了。

object TestUDF{def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local[*]").appName("testUDF").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3).toInt)}).toDF("id", "name", "salary", "bonus")session.udf.register("all_income",(sal:Int,bonus:Int)=>{sal*12 + bonus})import org.apache.spark.sql.functions
//    df.withColumn("all",functions.callUDF("all_income",$"salary",$"bonus"))
//      .select("id","name","all")
//      .show()df.createTempView("salary")session.sql("""|select id,name,all_income(salary,bonus) all from salary|""".stripMargin).show()}
}

输出:

2. UDAF(用户自定义的聚合函数)

指的是用户自定义的聚合函数,多进一出,比如MySQL中的,count函数,avg函数。

以学生信息为主进行统计,所有人员的年龄的总和

或者每个性别的年龄的平均值

计算所有人的年龄之和:

package com.atguigu.bigdata.testimport org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator/*** ClassName : TestUDAF* Package : com.atguigu.bigdata.test* Description** @Author HeXua* @Create 2024/11/29 19:09*         Version 1.0*/
object TestUDAF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udaf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt").map(t => {val strs = t.split(" ")(strs(0), strs(1), strs(2).toInt, strs(3))}).toDF("id", "name", "age", "gender")import org.apache.spark.sql.functions._// 注册udaf函数session.udf.register("mysum",udaf(new MySum))df.createTempView("student")session.sql("""|select mysum(age) from student|""".stripMargin).show()}
}
// udaf的类继承Aggregator抽象类
class MySum extends Aggregator[Int,Int,Int]{//初始化def zero: Int = 0//聚合逻辑def reduce(b: Int, a: Int): Int = a+b//整体聚合def merge(b1: Int, b2: Int): Int = b1+b2//最终返回值def finish(reduction: Int): Int = reduction//累加值的类型def bufferEncoder: Encoder[Int] = Encoders.scalaInt//输出结果的类型def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

定义用户自定义聚合函数时,继承Aggregator类需要指定三个泛型参数。这三个泛型参数分别代表不同的概念。

泛型参数解释:

1. 输入类型(IN)

这是聚合函数的输入类型,即每次调用reduce方法时传入的单个元素的类型。例如你要计算一组整数的平均值,输入类型就是int。

2. 缓冲区类型(BUFFER)

这是聚合函数的中间状态类型,也称为缓冲区类型。

例如你要计算一组整数的平均值,缓冲区可能包含两个字段:总和和计数,因为iBUF可能是一个元组。

3. 输出类型(OUT)

这是聚合函数的最终输出类型,即finish方法返回的类型。例如你要计算平均值,最终输出类型是Double。

方法解释:

zero:初始化缓冲区的值,对于平均值计算,初始化和计数都是0。

reduce:更新缓冲区,每次传入一个新的输入值时,更新总和和计数。

finish:计算最终结果,根据缓冲区中的总和和计数,计算平均值。

bufferEncoder:定义缓冲区类型的编码器,用于序列化和反序列化缓冲区。

outputEncoder:定义最终输出类型的编码器,用于序列化和反序列化输出结果。

计算每个性别的年龄的平均值:

case class AggragateVo(var cnt:Int,var sum:Int)
object MyAvg extends Aggregator[Int,AggragateVo,Double]{override def zero: AggragateVo = AggragateVo(0,0)override def reduce(b: AggragateVo, a: Int): AggragateVo = {b.cnt += 1b.sum += ab}override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo = {b1.cnt += b2.cntb1.sum += b2.sumb1}override def finish(reduction: AggragateVo): Double = {reduction.sum.toDouble /reduction.cnt}override def bufferEncoder: Encoder[AggragateVo] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

3. UDTF(用户自定义炸裂函数)

拆分函数,进入的是一行内容出现的结果是多行内容。

spark中并不直接支持UDTF函数。但可以使用hive中的炸裂函数达到效果。

import org.apache.spark.sql.SparkSessionobject TestUDTF {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("test udtf").master("local[*]").getOrCreate()import session.implicits._val df = session.sparkContext.textFile("file:///headless/workspace/spark/data/m.txt").map(t => {val strs = t.split(",")(strs(0), strs(1), strs(2))}).toDF("id", "name", "actors")//explode map arraydf.createTempView("movies")session.sql("""|select id,name,actor  from movies lateral view explode(split(actors,'\\|')) t as actor|""".stripMargin).createTempView("movies1")session.sql("""|select count(1),actor from movies1 group by actor|""".stripMargin).show()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/484471.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 服务器 一次性查看 CPU、内存和磁盘使用情况

创建 vi check_usage.sh #!/bin/bashecho " CPU 使用率 " mpstat -P ALL 1 1echo -e "\n 内存使用情况 " free -hecho -e "\n 磁盘使用率 " df -h执行授权 chmod x check_usage.sh执行查看 ./check_usage.sh这样可以快速获取系统资源的概览。…

一文理解多模态大语言模型——下

作者:Sebastian Raschka 博士, 翻译:张晶,Linux Fundation APAC Open Source Evangelist 编者按:本文并不是逐字逐句翻译,而是以更有利于中文读者理解的目标,做了删减、重构和意译&#xff0c…

数据结构---链表(2)---双向链表

链表(1)中讲过了在OJ题中出现很多并且能作为一些复杂数据结构子结构的不带头单向不循环链表,下面讲解应用很广很实用的带头双向循环链表。 三、双向链表---DoublyLinkedList 演示带头双向循环链表(实用)。 带头--->不需要对空链表继续单独判断;循环…

PH热榜 | 2024-12-04

1. Stackfix 标语:几秒钟内就能对比软件。 介绍:立刻就能对比不同软件的价格和功能。不用再费力看各种评测或接推销电话了。我们提供实时价格、并排对比,还有专家的推荐建议。 产品网站: 立即访问 Product Hunt: V…

分布式光伏电站如何实现监控及集中运维管理?

安科瑞戴婷 Acrel-Fanny 前言 今年以来,在政策利好推动下光伏、风力发电、电化学储能及抽水蓄能等新能源行业发展迅速,装机容量均大幅度增长,新能源发电已经成为新型电力系统重要的组成部分,同时这也导致新型电力系统比传统的电…

AD学习笔记·空白工程的创建

编写不易,禁止搬运,仅供学习,感谢理解 序言 本文参考B站,凡亿教育,连接放在最后。 创建工程文件 在使用AD这个软件的电路板设计中,有很多的地方跟嘉立创eda还是有不一样的地方,其中一个地方就…

基于Java Springboot生鲜食品订购微信小程序

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA/eclipse 微信…

【阅读笔记】Android广播的处理流程

关于Android的解析,有很多优质内容,看了后记录一下阅读笔记,也是一种有意义的事情, 今天就看看“那个写代码的”这位大佬关于广播的梳理, https://blog.csdn.net/a572423926/category_11509429.html https://blog.c…

第十三章 Linux计划任务

注意:进公司和有公司成员离职,一定要问计划任务,防止别人搞破坏背锅 13.1 一次性计划任务(atd服务) 1 安装 atd 服务 yum install -y at systemctl enable atd systemctl start atd ## 启动atd服务 systemctl status atd ## 查看atd服务…

Kali Linux使用Netdiscover工具的详细教程

Kali Linux使用Netdiscover工具的详细教程 引言 在网络安全和渗透测试的过程中,网络发现是一个至关重要的步骤。Netdiscover是Kali Linux中一个非常实用的网络发现工具,它可以帮助用户快速识别局域网中的活动设备。本文将详细介绍如何使用Netdiscover工…

EasyNVR中HTTP-FLV协议无法播放怎么解决?

在科技日新月异的今天,摄像头作为公共安全领域的重要一环,其技术的不断提升正显著地改变着社会的安全格局。从最初的简单监控到如今的高清智能分析,我们可以对特定区域进行实时监控和记录,为社会的安全稳定提供了强有力的保障。 问…

VINS_MONO视觉导航算法【一】基础知识介绍

文章目录 VINS-Mono其他文章说明简介单目相机存在的尺度不确定问题缺乏深度信息尺度等价性对极几何和三角化平移和深度的关系解决尺度不确定问题的方法视觉惯性里程计(VIO)初始尺度估计持续尺度校正 摄像头数据处理直接法(Direct Method&…

「Mac畅玩鸿蒙与硬件42」UI互动应用篇19 - 数字键盘应用

本篇将带你实现一个数字键盘应用,支持用户通过点击数字键输入数字并实时更新显示内容。我们将展示如何使用按钮组件和状态管理来实现一个简洁且实用的数字键盘。 关键词 UI互动应用数字键盘按钮组件状态管理用户交互 一、功能说明 数字键盘应用将实现以下功能&…

AI PC处理器ARM架构-引入NPU和大模型

AI PC处理器架构变化:ARM低功耗、引入NPU和大模型 AI进化加速端侧落地,新一轮浪潮蓄势待发(2024)”。ARM(Advanced RISC Machine)架构和x86架构是两种主要的处理器架构,它们在设计理念、应用场景和性能特点等方面有显著的差异。 ARM架构是一…

华为的USG6000为什么不能ping通

前言: 防火墙usg6000v的镜像 链接: https://pan.baidu.com/s/1uLRk0-hnHRTLYLx1Pnplow?pwdtymp 提取码: tymp 看了好多毒文章,感觉写作业更有意思,可以了解新的知识 内容: 首先看毒文章是这样说的,华为的防火墙是…

Mac安装MINIO服务器实现本地上传和下载服务

0.MINIO学习文档 Minio客户端mc使用 | Elibaron学习笔记 1.Mac安装MINIO 中文官方网址:MinIO下载和安装 | 用于创建高性能对象存储的代码和下载内容 (1) brew 安装 brew install minio/stable/minio (2)安装完成,执行brew i…

墨者学院-登录密码重置漏洞分析

声明! 文章所提到的网站以及内容,只做学习交流,其他均与本人无关,切勿触碰法律底线,否则后果自负!!!! 目录标题 前言解题过程总结 前言 在实际渗透测试中,登…

qt QPauseAnimation详解

1、概述 QPauseAnimation是Qt框架中的一个类,专门用于在动画序列中添加暂停效果。它继承自QAbstractAnimation,允许在动画组或动画序列中指定一个时间段的暂停。这对于创建复杂的动画序列非常有用,可以让动画在特定时刻暂停并保持状态。通过…

【热门主题】000076 探索单片机的奥秘:原理、编程与应用全解析

前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目录 【热…

SpringMVC(1)

前言 1. SpringMVC简介 2. 入门案例 第一步导入坐标,SpringMVC和servlet 这样其实就把我们要用的Spring相关的都用上了 第三步就是加载这个bean 写配置类 第四步做一个Tomcat容器启动的配置 还要加上Tomcat插件 我们在创建一个快捷方式 注意由于我的JDK版本高…