Spark SQL 中DataFrame DSL的使用

在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。

文章链接:

一、单词统计案例引入

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心数据类型不太一样** 1、读取数据构建一个DataFrame,相当于一张表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定读取数据的格式.schema("line STRING") //指定列的名和列的类型,多个列之间使用,分割.option("sep", "\n") //指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt") // 指定要读取数据的位置,可以使用相对路径/*** DSL: 类SQL语法 api  介于代码和纯sql之间的一种api** spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数* 如果想要在DSL语法中使用这些函数,需要导入隐式转换**///导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._//    linesDF.select(explode(split($"line","\\|")) as "word")
//      .groupBy($"word")
//      .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存数据*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}

注意:show()可以指定两个参数,第一个参数为展现的条数,不指定默认展示前20条数据,第二个参数默认为false,代表的是如果数据过长展示就会不完全,可以指定为true,使得数据展示完整,比如 : show(200,truncate = false)

二、数据源获取

查看官方文档:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多种数据源的获取。

 1、csv-->json

    val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多种类型数据源读取演示").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数import org.apache.spark.sql.functions._//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段import sparkSession.implicits._/*** 读csv格式的文件-->写到json格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")

2、json-->parquet

val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段/*** 读取json数据格式,因为json数据有键值对,会自动的将健作为列名,值作为列值,不需要手动的设置表结构*///1500100967,能映秋,21,女,文科五班//方式1://    val studentsJsonDF: DataFrame = sparkSession.read//      .format("json")//      .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:实际上也是调用方式1,只是更简洁了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")

3、parquet-->csv

    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._/*** parquet:压缩的比例由信息熵决定,通俗的说就是数据的重复程度决定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")

4、数据库

下面我们以mysql为例:

    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("连接数据库").config("spark.sql.shuffle.partitions",1) //默认分区的数量是200个.getOrCreate()/*** 读取数据库中的数据,mysql* 如果链接失败,可以将参数补全:jdbc:mysql://192.168.19.100:3306?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false*/val jdDF: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://192.168.19.100:3306?useSSL=false").option("dbtable", "bigdata29.emp").option("user", "root").option("password", "123456").load()jdDF.show(10,truncate = false)

三、DataFrame DSL API的使用

1、select


import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函数演示").getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函数*///方式1:只能查询原有字段,不能对字段做出处理,比如加减、起别名之类studentsDF.select("id", "name", "age")//方式2:弥补了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隐式转换函数中的$将字段变为一个对象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用对象对字段进行处理
//    stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show()       //不可使用未变为对象的字段stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age")                 // +是函数,可以等价于该语句//3.2可以在select中使用sql函数studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}

2、where

    /*** where函数:过滤数据*///方式1:直接将sql中的where语句以字符串形式传参studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列对象形式过滤/*** 注意在此种方式下:等于和不等于符号与我们平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()

3、groupBy和agg

    /*** groupby:分组函数     agg:聚合函数* 注意:* 1、groupby与agg函数通常都是一起使用* 2、分组聚合之后的结果DataFrame中只会包含分组字段与聚合字段* 3、分组聚合之后select中无法出现不是分组的字段*///需求:根据班级分组,求每个班级的人数和平均年龄studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()

4、join

/*** 5、join:表关联*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//关联场景1:所关联的字段名字一样studentsDF.join(subjectDF1,"id")//关联场景2:所关联的字段名字不一样studentsDF.join(subjectDF2,$"id"===$"sid","inner")
//    studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面两种关联场景默认inner连接方式(内连接),可以指定参数选择连接方式,比如左连接、右连接、全连接之类* * @param joinType Type of join to perform. Default `inner`. Must be one of:* *                 `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* *                 `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/

5、开窗

    /*** 开窗函数* 1、ROW_NUMBER():为分区中的每一行分配一个唯一的序号。序号是根据ORDER BY子句定义的顺序分配的* 2、RANK()和DENSE_RANK():为分区中的每一行分配一个排名。RANK()在遇到相同值时会产生间隙,而DENSE_RANK()则不会。**///需求:统计每个班级总分前三的学生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函数,会新增一列,但是要预先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()

注意:

      DSL API 不直接对应 SQL 的关键字执行顺序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照构建逻辑查询的方式来组织代码,使其与 SQL 查询的逻辑结构相似。

在构建 Spark DataFrame 转换和操作时,常用流程介绍:

  1. 选择数据源:使用 spark.read 或从其他 DataFrame 派生。
  2. 转换:使用各种转换函数(如 selectfiltermapflatMapjoin 等)来修改 DataFrame。
  3. 聚合:使用 groupBy 和聚合函数(如 sumavgcount 等)对数据进行分组和汇总。
  4. 排序:使用 orderBy 或 sort 对数据进行排序。
  5. 输出:使用 showcollectwrite 等函数将结果输出到控制台、收集到驱动程序或写入外部存储。

四、RDD与DataFrame的转换

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd与Df之间的转换").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因为在Rdd中不会存储文件的结构(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/334316.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙ArkUI-X跨语言调用说明:【平台桥接开发指南(Android)BridgePlugin】

BridgePlugin (平台桥接) 本模块提供ArkUI端和Android平台端消息通信的功能&#xff0c;包括数据传输、方法调用和事件调用。需配套ArkUI端API使用&#xff0c;ArkUI侧具体用法请参考[Bridge API]。 说明&#xff1a; 开发前请熟悉鸿蒙开发指导文档&#xff1a; gitee.com/li-…

采用java语言+B/S架构+后端SpringBoot前端Vue开发的ADR药品不良反应智能监测系统源码

采用java语言&#xff0b;B/S架构&#xff0b;后端SpringBoot前端Vue开发的ADR药品不良反应智能监测系统源码 ADR监测引擎每日主动获取检验数据、病历内容&#xff08;可拓展&#xff09;、以及其他临床数据&#xff0c;根据知识库内容自动判定患者是否有不良反应迹象&#xf…

Diffusion Model, Stable Diffusion, Stable Diffusion XL 详解

文章目录 Diffusion Model生成模型DDPM概述向前扩散过程前向扩散的逐步过程前向扩散的整体过程 反向去噪过程网络结构训练和推理过程训练过程推理过程优化目标 详细数学推导数学基础向前扩散过程反向去噪过程 Stable Diffusion组成结构运行流程网络结构变分自编码器 (VAE)文本编…

探索Python中的随机数生成与统计分析

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、随机数的魅力与实用性 1. 随机数生成基础 2. 批量生成随机数 二、随机数的高级应用&a…

数字化工厂怎么收集,处理数据?

数字化工厂的数据收集与处理 数字化工厂是现代化工厂&#xff0c;利用数字技术和数据分析提高效率和优化流程。数据分析作为数字化工厂的核心技术&#xff0c;对数据的获取与处理至关重要。在数字化工厂中&#xff0c;数据的来源包括企业内部信息系统、物联网信息以及外部信息&…

《智能水表计量平台技术架构:数字化管理助力节水环保》

随着科技的不断发展&#xff0c;智能水表计量平台作为一种新型的水资源管理工具&#xff0c;正在逐渐受到关注和应用。本文将深入探讨智能水表计量平台的技术架构设计与实现&#xff0c;以及如何通过数字化管理助力节水环保事业。 ### 1. 系统架构概述 智能水表计量平台的技术…

Jenkins的Pipeline流水线

目录 前言 流水线概念 什么是流水线 Jenkins流水线 pipeline node stage step 创建一个简单的流水线 创建Pipeline项目 选择模板 测试 前言 提到 CI 工具&#xff0c;首先想到的就是“CI 界”的大佬——Jenkjns,虽然在云原生爆发的年代,蹦出来了很多云原生的 CI 工具…

VPN的详细理解

VPN&#xff08;Virtual Private Network&#xff0c;虚拟私人网络&#xff09;是一种在公共网络上建立加密通道的技术&#xff0c;通过这种技术可以使远程用户访问公司内部网络资源时&#xff0c;实现安全的连接和数据传输。以下是对VPN的详细介绍&#xff1a; 选择代理浏览器…

Python项目:数据可视化_下载数据【笔记】

源自《Python编程&#xff1a;从入门到实践》 作者&#xff1a; Eric Matthes 02 下载数据 2.1 sitka_weather_07-2021_simple.csv from pathlib import Path import matplotlib.pyplot as plt import csv from datetime import datetimepath Path(D:\CH16\sitka_weather_0…

在链游中,智能合约如何被用于实现游戏内的各种功能

随着区块链技术的快速发展&#xff0c;链游&#xff08;Blockchain Games&#xff09;作为区块链技术的重要应用领域之一&#xff0c;正逐渐展现出其独特的魅力和优势。其中&#xff0c;智能合约作为链游的核心技术之一&#xff0c;对于实现游戏内的各种功能起到了至关重要的作…

【MySQL】初识数据库

序言 在接触到新知识时&#xff0c;相信各位都会有一种陌生以及想逃避的感觉&#xff0c;但是一旦克服了这种万事开头难的感觉&#xff0c;之后就犹如拨开云雾见天明&#xff0c;并且随着一步一个脚印地走下去&#xff0c;时间久了再回过头来看相信各位一定都会发出轻舟已过万重…

【vue-4】遍历数组或对象v-for

1、遍历数组 <ul><li v-for"(value,index) in web.number">index>{{index}}:value>{{value}}</li> </ul> 知识点&#xff1a; <ul>标签定义无序列表 举例&#xff1a; <ul><li>Coffee</li><li>Tea…

本地部署Whisper实现语言转文字

文章目录 本地部署Whisper实现语言转文字1.前置条件2.安装chocolatey3.安装ffmpeg4.安装whisper5.测试用例6.命令行用法7.本地硬件受限&#xff0c;借用hugging face资源进行转译 本地部署Whisper实现语言转文字 1.前置条件 环境windows10 64位 2.安装chocolatey 安装chocol…

java项目级云MES源码(制造执行系统) springboot + vue-element-plus-admin生产制造业MES系统源码

java项目级云MES源码&#xff08;制造执行系统) springboot vue-element-plus-admin生产制造业MES系统源码 MES系统通过信息传递对从订单下达到产品完成的整个生产过程进行优化管理。当工厂发生实时事件时&#xff0c;MES制造执行系统功能的发挥重点体现在及时做出反应、报告&…

家政保洁服务小程序怎么做?家政公司快速搭建专属小程序

在数字化时代背景下&#xff0c;家政保洁服务行业也迎来了线上转型的新机遇。家政保洁服务小程序&#xff0c;作为一种新型的线上服务平台&#xff0c;不仅能够提升家政公司的服务效率&#xff0c;还能为顾客提供更加便捷的预约上门服务体验。那么家政保洁服务小程序怎么做呢&a…

nginx 安全配置

1、前言 前后端分离后&#xff0c;nginx 作为跨域转发工具在日常应用中越来越广泛&#xff0c;它的安全性不能不能忽略。 2、nginx 安装相关说明 2.1 直接下载安装包 在nginx官网下载编译好的安装包&#xff0c;链接地址为nginx: download。如果是linux系统&#xff0c;直接使…

【umi-max】初识 antd pro

修改端口号 根目录下的 .env 文件&#xff1a; PORT8888目录结构 (umijs.org) 新增页面 在 umirc.ts 中进行配置。 新增页面 - Ant Design Pro 这里有一个配置 icon:string&#xff0c;可以在菜单加 icon 图标&#xff0c;默认使用 antd 的 icon 名&#xff0c;默认不适用二…

大咖论道,智慧档案编研知识共享

关注我们 - 数字罗塞塔计划 - 本期直播全程“回放”&#xff0c;主题分享以视频形式&#xff0c;专家们的观点以文字形式&#xff0c;便于细读和收藏。 一、主题分享 详细视频请在 公众号 数字罗塞塔计划 中观看 二、大咖论道 以下内容整理来自直播间&#xff0c;上海市…

多模态MLLM都是怎么实现的(9)-时序LLM是怎么个事儿?

时序预测这东西大家一般不陌生,随便举几个例子 1- 金融,比如预测股票(股市有风险,入市需谨慎),纯用K线做,我个人不太推荐 2- 天气,比如预测云图,天气预报啥的 3- 交通,早晚高峰,堵车啥的,车啥时候加油,啥时候充电之类的 4- 医疗,看你病史和喝酒的剂量建模,看你会…