Day10—Spark SQL基础

在这里插入图片描述

Spark SQL介绍

​ Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如JSON、Parquet、Avro、CSV格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

Spark SQL的主要特点:

  • 将SQL查询与Spark应用程序无缝组合

​ Spark SQL允许使用SQL或熟悉的API在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce的;而Spark SQL底层使用的是Spark RDD。

  • 可以连接到多种数据源

​ Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。

  • 在现有的数据仓库上运行SQL或HiveQL查询

​ Spark SQL支持HiveQL语法以及Hive SerDes和UDF (用户自定义函数) ,允许访问现有的Hive仓库。

DataFrame和DataSet

  • DataFrame的结构

​ DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。

​ DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息) ,因此看起来更像是一张数据库表。例如,在一个RDD中有3行数据,将该RDD转成DataFrame后,其中的数据可能如图所示:
在这里插入图片描述

  • DataSet的结构
    Dataset是一个分布式数据集,是Spark 1.6中添加的一个新的API。相比于RDD, Dataset提供了强类型支持,在RDD的每行数据加了类型约束。
    在这里插入图片描述
    在Spark中,一个DataFrame代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。

Spark SQL的基本使用

​ Spark Shell启动时除了默认创建一个名为sc的SparkContext的实例外,还创建了一个名为spark的SparkSession实例,该spark变量可以在Spark Shell中直接使用。

​ SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。

Spark SQL函数

内置函数

​ Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions

中。其中大部分函数与Hive中的相同。

​ 使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL

语句中使用。

  • 以编程的方式使用lower()函数将用户姓名转为小写/大写,代码如下:
df.select(lower(col("name")).as("greet")).show()
df.select(upper(col("name")).as("greet")).show()

​ 上述代码中,df指的是DataFrame对象,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col(“name”)指定要查询的列,也可以使用$"name"代替,代码如下:

df.select(lower($"name").as("greet")).show()
  • 以SQL语句的方式使用lower()函数,代码如下:
df.createTempView("temp")
spark.sql("select upper(name) as greet from temp").show()

​ 除了可以使用select()方法查询指定的列外,还可以直接使用filter()、groupBy()等方法对DataFrame数据进行过滤和分组,例如以下代码:

df.printSchema()  # 打印Schema信息
df.select("name").show()  # 查询name列
# 查询name列和age列,其中将age列的值增加1
df.select($"name",$"age"+1).show()
df.filter($"age">25).show() # 查询age>25的所有数据
# 根据age进行分组,并求每一组的数量
df.groupBy("age").count().show() 
自定义函数

​ 当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据需求编写自定义函数(User Defined Functions, UDF),然后在Spark SQL中调用。

​ 例如有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位数字用星号()代替,比如手机号180***2688。这时就可以编写一个自定义函数来实现这个需求,实现代码如下:

package spark.demo.sqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}/*** 用户自定义函数,隐藏手机号中间4位*/
object SparkSQLUDF {def main(args: Array[String]): Unit = {//创建或得到SparkSessionval spark = SparkSession.builder().appName("SparkSQLUDF").master("local[*]").getOrCreate()//第一步:创建测试数据(或直接从文件中读取)//模拟数据val arr=Array("18001292080","13578698076","13890890876")//将数组数据转为RDDval rdd: RDD[String] = spark.sparkContext.parallelize(arr)//将RDD[String]转为RDD[Row]val rowRDD: RDD[Row] = rdd.map(line=>Row(line))//定义数据的schemaval schema=StructType(List{StructField("phone",StringType,true)})//将RDD[Row]转为DataFrameval df = spark.createDataFrame(rowRDD, schema)//第二步:创建自定义函数(phoneHide)val phoneUDF=(phone:String)=>{var result = "手机号码错误!"if (phone != null && (phone.length==11)) {val sb = new StringBuffersb.append(phone.substring(0, 3))sb.append("****")sb.append(phone.substring(7))result = sb.toString}result}//注册函数(第一个参数为函数名称,第二个参数为自定义的函数)spark.udf.register("phoneHide",phoneUDF)//第三步:调用自定义函数df.createTempView("t_phone")		//创建临时视图spark.sql("select phoneHide(phone) as phone from t_phone").show()// +-----------+// |      phone|// +-----------+// |180****2080|// |135****8076|// |138****0876|// +-----------+}
}
窗口(开窗)函数

​ 开窗函数是为了既显示聚合前的数据,又显示聚合后的数据,即在每一行的最后一列添加聚合函数的结果。开窗口函数有以下功能:

  • 同时具有分组和排序的功能
  • 不减少原表的行数
  • 开窗函数语法:

聚合类型开窗函数

sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]]) 

排序类型开窗函数

ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])
  • 以row_number()开窗函数为例:

​ 开窗函数row_number()是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排列的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TopN)。row_number()函数的使用格式如下:

row_number() over (partition by 列名 order by 列名 desc) 行号列别名

上述格式说明如下:

partition by:按照某一列进行分组;

order by:分组后按照某一列进行组内排序;

desc:降序,默认升序。

例如,统计每一个产品类别的销售额前3名,代码如下:

package spark.demo.sqlimport org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SparkSession}/*** 统计每一个产品类别的销售额前3名(相当于分组求TOPN)*/
object SparkSQLWindowFunctionDemo {def main(args: Array[String]): Unit = {//创建或得到SparkSessionval spark = SparkSession.builder().appName("SparkSQLWindowFunctionDemo").master("local[*]").getOrCreate()//第一步:创建测试数据(字段:日期、产品类别、销售额)val arr=Array("2019-06-01,A,500","2019-06-01,B,600","2019-06-01,C,550","2019-06-02,A,700","2019-06-02,B,800","2019-06-02,C,880","2019-06-03,A,790","2019-06-03,B,700","2019-06-03,C,980","2019-06-04,A,920","2019-06-04,B,990","2019-06-04,C,680")//转为RDD[Row]val rowRDD=spark.sparkContext.makeRDD(arr).map(line=>Row(line.split(",")(0),line.split(",")(1),line.split(",")(2).toInt))//构建DataFrame元数据val structType=StructType(Array(StructField("date",StringType,true),StructField("type",StringType,true),StructField("money",IntegerType,true)))//将RDD[Row]转为DataFrameval df=spark.createDataFrame(rowRDD,structType)//第二步:使用开窗函数取每一个类别的金额前3名df.createTempView("t_sales")		//创建临时视图//执行SQL查询spark.sql("select date,type,money,rank from " +"(select date,type,money," +"row_number() over (partition by type order by money desc) rank "+"from t_sales) t " +"where t.rank<=3").show()}
}

在这里插入图片描述

结果展示

在这里插入图片描述

小结

本次学习了Spark SQL基础,学习Spark SQL基础是掌握大数据处理的关键一步。Spark SQL是Apache Spark的一个模块,它提供了对结构化和半结构化数据的高效处理能力。通过学习Spark SQL,你将能够使用SQL查询和DataFrame API来分析数据集。Spark SQL的核心优势在于其能够处理大规模数据集,同时保持高性能。它支持多种数据源,包括HDFS、S3、Parquet等,使得数据的读写变得简单。此外,Spark SQL还提供了丰富的数据类型和复杂的数据操作功能,如过滤、分组、排序和聚合。学习过程中,你将了解如何创建DataFrame,执行转换和操作,以及如何使用SQL语句进行查询。你还将学习到如何优化Spark SQL查询,包括使用分区、索引和缓存技术来提高性能。

掌握Spark SQL基础对于数据工程师和分析师来说非常重要,因为它不仅可以提高数据处理的效率,还可以帮助你更好地理解和分析大规模数据集。随着你的学习深入,你将能够更有效地利用Spark的强大功能来解决实际问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/357522.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

react18 实现具名插槽

效果预览 技术要点 当父组件给子组件传递的 JSX 超过一个标签时&#xff0c;子组件接收到的 children 是一个数组&#xff0c;通过解析数组中各 JSX 的属性 slot &#xff0c;即可实现具名插槽的分发&#xff01; 代码实现 Father.jsx import Child from "./Child";…

OGG几何内核开发-复杂装配模型读取、显示、分析

OGG几何内核读取STEP模型文件的API有STEPCAFControl_Reader、STEPControl_Reader。 STEPCAFControl_Reader使用很复杂&#xff0c;但可以展示装配树&#xff0c;有利于模型的详细分析。 本文演示了《插件化算法研究平台V2》的OCC几何模型插件的部分功能&#xff1a;显示装配树…

Golang | Leetcode Golang题解之第172题阶乘后的零

题目&#xff1a; 题解&#xff1a; func trailingZeroes(n int) (ans int) {for n > 0 {n / 5ans n}return }

pytets测试框架中如果需要运行多个测试套件时pytest.ini文件设置

pytets测试框架中如果需要运行多个测试套件时pytest.ini文件设置方法 testpaths testcases/fenmi testcases/weixin testcases/Zgen

QT实现人脸识别

QT实现人脸识别 Face.pro文件&#xff1a; QT core guigreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any Qt feature that has been marked deprecated (the exact warnings # d…

【MATLAB】(高数)

参考文章 函数极限 导数与偏导 极值和最值 局部范围的最值 局部范围内的最值&#xff0c;相当于函数的极值 离散数据的最值 多元函数的极值 fminunc [x, fval] fminunc(fun, x0)fun为代求极值的函数&#xff1b;x0为起始点&#xff0c;即从这个点开始寻找极值&#xff0c;…

数据结构9——排序

一、冒泡排序 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff0c;顾名思义&#xff0c;就是指越小的元素会经由交换慢慢“浮”到数列的顶端。 算法原理 从左到右&#xff0c;依次比较相邻的元素大小&#xff0c;更大的元素交换到右边&#xff1b;从第一组相邻元素比较…

贪心算法——赶作业(C++)

慢慢来&#xff0c;沉稳一点。 2024年6月18日 题目描述 A同学有n份作业要做&#xff0c;每份作业有一个最后期限&#xff0c;如果在最后期限后交作业就会扣分&#xff0c;现在假设完成每份作业都需要一天。A同学想安排作业顺序&#xff0c;把扣分降到最低&#xff0c;请帮他实…

工业园安全生产新保障:广东地区加强可燃气体报警器校准检测

广东&#xff0c;作为我国经济的重要引擎&#xff0c;拥有众多工业园区。 这些工业园区中&#xff0c;涉及化工、制药、机械制造等多个领域&#xff0c;每天都会产生和使用大量的可燃气体。因此&#xff0c;可燃气体报警器的安装与校准检测&#xff0c;对于保障工业园区的安全…

XSS漏洞实验

本篇为xss漏洞实验练习&#xff0c;练习网址来源于网络 练习网址&#xff1a;XSS平台|CTF欢迎来到XSS挑战|XSS之旅|XSS测试 一、前置说明 在测试过程中&#xff0c;有哪些东西是我们可以利用来猜测与判断的&#xff1a; 网页页面的变化&#xff1b;审查网页元素&#xff1b;查…

广州化工厂可燃气体报警器检定检验:安全生产新举措显成效

随着科技的不断发展&#xff0c;可燃气体报警器的检定检验技术也在不断进步。 广州的一些化工厂开始采用先进的智能检测系统和数据分析技术&#xff0c;对报警器的性能进行更加精准和全面的评估。 这些新技术不仅能够提高检定检验的效率和准确性&#xff0c;还能够为化工厂的…

批量重命名神器揭秘:一键实现文件夹随机命名,自定义长度轻松搞定!

在数字化时代&#xff0c;我们经常需要管理大量的文件夹&#xff0c;尤其是对于那些需要频繁更改或整理的文件来说&#xff0c;给它们进行批量重命名可以大大提高工作效率。然而&#xff0c;传统的重命名方法既繁琐又耗时&#xff0c;无法满足高效工作的需求。今天&#xff0c;…

网管工作实践_02_IP/MAC地址管理工具

1、ipconfig命令格式及参数 ipconfig是内置于Windows的TCP/IP应用程序&#xff0c;用于显示本地计算机网络适配器的MAC地址和IP地址等配置信息&#xff0c;这些信息一般用来榆验手动配置的TCP/IP设置是否正确。当在网络中使用 DHCP服务时&#xff0c;IPConfig可以检测计算机中分…

k8s上尝试滚动更新和回滚

滚动更新和回滚 实验目标&#xff1a; 学习如何进行应用的滚动更新和回滚操作。 实验步骤&#xff1a; 创建一个 Deployment。更新 Deployment 的镜像版本&#xff0c;观察滚动更新过程。回滚到之前的版本&#xff0c;验证回滚操作。 今天呢&#xff0c;我们继续来进行我们k…

远程医疗软件到底哪个好用?

随着科技进步的不断推进&#xff0c;远程医疗已经成为现代医疗体系的一个重要支柱。远程医疗软件&#xff0c;通过网络通信技术的运用&#xff0c;打破了地理限制&#xff0c;实现了医疗资源的有效整合与共享&#xff0c;为民众提供了前所未有的便捷高效的医疗服务体验。那么&a…

【C++】初始化列表、匿名对象、static成员、友元、内部类

文章目录 一、初始化列表构造函数体赋值初始化列表explicit关键字 二、匿名对象三、static成员四、友元友元函数友元类 五、内部类六、练习题 一、初始化列表 构造函数体赋值 实际上&#xff0c;构造函数的函数体内&#xff0c;并不是对 对象 初始化的地方&#xff0c;而是对…

Spring框架的核心原则和IoC容器介绍

Spring框架是一个开源的应用程序框架&#xff0c;它遵循以下核心原则&#xff1a; 1.Inversion of Control&#xff08;控制反转&#xff09;: Spring框架通过IoC容器管理对象的生命周期和依赖关系&#xff0c;而不是由程序代码直接创建对象。这样可以降低组件之间的耦合度&…

三、MyBatis实践:提高持久层数据处理效率

三、MyBatis实践&#xff1a;提高持久层数据处理效率 目录 一、Mybatis简介 1.1 简介1.2 持久层框架对比1.3 快速入门&#xff08;基于Mybatis3方式&#xff09; 二、MyBatis基本使用 2.1 向SQL语句传参 2.1.1 mybatis日志输出配置2.1.2 #{}形式2.1.3 ${}形式 2.2 数据输入 2…

记MySQL事务+消息队列引起的问题

问题描述&#xff1a; 先说一下流程&#xff1a;后端保存前端提交的图表信息&#xff0c;然后发送异步消息到消息队列&#xff0c;由下游服务去处理图表信息。 部署项目到服务器&#xff0c;验证项目功能的时候&#xff0c;出现了以下错误&#xff1a;数据库存在数据。下游服…

《Python 机器学习》作者新作:从头开始构建大型语言模型,代码已开源

ChatGPT狂飙160天&#xff0c;世界已经不是之前的样子。 更多资源欢迎关注 自 ChatGPT 发布以来&#xff0c;大型语言模型&#xff08;LLM&#xff09;已经成为推动人工智能发展的关键技术。 近期&#xff0c;机器学习和 AI 研究员、畅销书《Python 机器学习》作者 Sebastian …