7.spark sql编程

概述

spark 版本为 3.2.4,注意 RDDDataFrame 的代码出现的问题及解决方案

本文目标如下:

  • RDD ,Datasets,DataFrames 之间的区别
  • 入门
    • SparkSession
    • 创建 DataFrames
    • DataFrame 操作
    • 编程方式运行 sql 查询
    • 创建 Datasets
    • DataFramesRDDs 互相转换
      • 使用反射推断模式
      • 编程指定 Schema

参考 Spark 官网

相关文章链接如下

文章链接
spark standalone环境安装地址
Spark的工作与架构原理地址
使用spark开发第一个程序WordCount程序及多方式运行代码地址
RDD编程指南地址
RDD持久化地址

RDD ,Datasets,DataFrames 之间的区别

Datasets , DataFrames和 RDD

Dataset 是一个分布式的数据集合,DatasetSpark 1.6 中添加的一个新接口,它增益了 RDD (强类型,可以使用 lambda 函数的能力) 和 Spark sql 优化执行引擎的优势。Dataset 可以由JVM对象构建,然后使用函数转换(map、flatMap、filter等)进行操作。数据集API有Scala和Java版本。Python不支持数据集API。

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的DataFrame APIScalaJavaPythonR中可用。在Scala API中,DataFrame只是Dataset[Row]的一个类型别名。而在Java API中,用户需要使用Dataset<Row>来表示DataFrame

DataFrame=RDD+SchemaRDD可以认为是表中的数据,Schema是表结构信息。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

入门

Spark SQL是一个用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多关于正在执行的数据结构信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与SparkSQL进行交互,包括SQLDataset API。计算结果时,使用相同的执行引擎,与用于表示计算的API/语言无关。方便用户切换不同的方式进行操作

people.json

people.json文件准备
在这里插入图片描述

SparkSession

Spark sql 中所有功能入口点是 SparkSession类。创建一个基本的 SparkSession,只需使用 SparkSession.builder()

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

创建 DataFrames

使用 SparkSession,通过存在的RDDhive 表,或其它的Spark data sources 程序创建 DataFrames

val df = spark.read.json("/tmp/people.json")
df.show()

执行如下图
在这里插入图片描述

DataFrame 操作

使用数据集进行结构化数据处理的基本示例如下

// 需要引入 spark.implicits._ 才可使用 $
// This import is needed to use the $-notation
import spark.implicits._
// 打印schema 以树格式
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)// 仅显示 name 列
// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+
// 显示所有,age 加1
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+// 过滤 人的 age 大于 21
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+// 按 age 分组统计
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

spark-shell 执行如下图
在这里插入图片描述
在这里插入图片描述

编程方式运行 sql 查询

df.createOrReplaceTempView("people")val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

执行如下:

scala> df.createOrReplaceTempView("people")scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]scala> sqlDF.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

创建 Datasets

Datasets类似于RDD,不是使用Java序列化或Kryo,而是使用专门的编码器来序列化对象,以便通过网络进行处理或传输。使用的格式允许Spark执行许多操作,如过滤、排序和哈希,而无需将字节反序列化为对象。

case class Person(name: String, age: Long)// 为 case classes 创建编码器
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()// 为能用类型创建编码器,并提供 spark.implicits._ 引入 
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// 通过定义类,将按照名称映射,DataFrames 能被转成 Dataset 
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "/tmp/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()

执行如下:

scala> case class Person(name: String, age: Long)
defined class Personscala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]scala> caseClassDS.show()
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+scala> val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]scala> primitiveDS.map(_ + 1).collect()
res1: Array[Int] = Array(2, 3, 4)scala> val path = "/tmp/people.json"
path: String = /tmp/people.jsonscala> val peopleDS = spark.read.json(path).as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]scala> peopleDS.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

DataFrames 与 RDDs 互相转换

Spark SQL支持两种不同的方法将现有RDD转换为Datasets

  • 第一种方法使用反射来推断包含特定类型对象的RDD的模式。这种基于反射的方法可以生成更简洁的代码,当知道 schema 结构的时间,会有更好的效果。
  • 第二种方法是通过编程接口,构造 schema,然后将其应用于现有的RDD。虽然此方法更详细,直至运行时,才能知道他们的字段和类型,用于构造 Datasets

使用反射推断模式

代码如下:

object RddToDataFrameByReflect {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("RddToDataFrameByReflect").master("local").getOrCreate()// 用于从RDD到DataFrames的隐式转换// For implicit conversions from RDDs to DataFramesimport spark.implicits._// Create an RDD of Person objects from a text file, convert it to a Dataframeval peopleDF = spark.sparkContext.textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()// Register the DataFrame as a temporary viewpeopleDF.createOrReplaceTempView("people")// SQL statements can be run by using the sql methods provided by Sparkval teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")// The columns of a row in the result can be accessed by field indexteenagersDF.map(teenager => "Name: " + teenager(0)).show()// or by field nameteenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()}case class Person(name: String, age: Long)
}

执行如下图:
在这里插入图片描述

编码问题

关于 Spark 官网 上复杂类型编码问题,直接加下面一句代码

teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))

报以下图片错误
在这里插入图片描述
将原有代码改变如下:

 // 没有为 Dataset[Map[K,V]] 预先定义编码器,需要自己定义// No pre-defined encoders for Dataset[Map[K,V]], define explicitlyimplicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]// 也可以如下操作// Primitive types and case classes can be also defined as// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect().foreach(println(_))// Array(Map("name" -> "Justin", "age" -> 19))

在这里插入图片描述
通过这一波操作,就可以理解什么情况下,需要编码器,以及编码器的作用

编程指定 Schema

代码如下:

object RddToDataFrameByProgram {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local").getOrCreate()import org.apache.spark.sql.Rowimport org.apache.spark.sql.types._// 加上此解决报错问题import spark.implicits._// Create an RDDval peopleRDD = spark.sparkContext.textFile("/Users/hyl/Desktop/fun/sts/spark-demo/people.txt")// The schema is encoded in a stringval schemaString = "name age"// Generate the schema based on the string of schemaval fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)// Convert records of the RDD (people) to Rowsval rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))// Apply the schema to the RDDval peopleDF = spark.createDataFrame(rowRDD, schema)// Creates a temporary view using the DataFramepeopleDF.createOrReplaceTempView("people")// SQL can be run over a temporary view created using DataFramesval results = spark.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations// The columns of a row in the result can be accessed by field index or by field nameresults.map(attributes => "Name: " + attributes(0)).show()}
}

执行如下图
在这里插入图片描述

官方文档的代码不全问题

Unable to find encoder for type String. An implicit Encoder[String] is needed to store String instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
results.map(attributes => "Name: " + attributes(0)).show()

在这里插入图片描述
加下以下代码

// 加上此解决报错问题
import spark.implicits._

如下图解决
在这里插入图片描述

结束

spark sql 至此结束,如有问题,欢迎评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/182033.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动态规划(Dynamic Programming)—— Java解释

一、基本思想 动态规划(Dynamic Programming)算法的核心思想是:将大问题划分为小问题进行解决,并将子问题的求解结果存储起来避免重复求解,从而一步步获取最优解的处理算法。 动态规划算法与分治算法类似,其基本思想也是将待求解…

计算机毕设 基于大数据的社交平台数据爬虫舆情分析可视化系统

文章目录 0 前言1 课题背景2 实现效果**实现功能****可视化统计****web模块界面展示**3 LDA模型 4 情感分析方法**预处理**特征提取特征选择分类器选择实验 5 部分核心代码6 最后 0 前言 🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕…

在 Python 中使用 Selenium 按文本查找元素

我们将通过示例介绍在Python中使用selenium通过文本查找元素的方法。 在 Python 中使用 Selenium 按文本查找元素 软件测试是检查应用程序是否满足用户需求的技术。 该技术有助于使应用程序成为无错误的应用程序。 软件测试可以手动完成,也可以通过某些软件完成。…

AI:60-基于深度学习的瓜果蔬菜分类识别

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌在这个漫长的过程,中途遇到了不少问题,但是…

Unity地面交互效果——3、曲面细分基础知识

大家好,我是阿赵。   之前介绍了使用动态法线贴图混合的方式模拟轨迹的凹凸感,这次来讲一下更真实的凹凸感制作。不过在说这个内容之前,这一篇先要介绍一下曲面细分着色器(Tessellation Shader)的用法。 一、为什么要做曲面细分 之前通过法…

docker离线部署

docker离线部署 1、目的 在可以连接互联网的情况下,可以在线安装Docker《Linux下Docker安装部署》,如果遇到内网服务器就没有办法进行在线安装,那么需要使用离线安装的方法。 2、下载安装包 创建工作文件夹: mkdir /opt/dock…

window10 定时任务

window10 定时任务 1、背景2、目标3、思路4、实操4.1、设置定时任务4.2、配置策略4.3、验证 1、背景 项目上由于业务调试需要,开具了一台window10系统,此台window10为项目组公共使用,为防止误操作分配了不通的账号,日常使用各自账…

Redis-使用java代码操作Redis->java连接上redis,java操作redis的常见类型数据存储,redis中的项目应用

java连接上redisjava操作redis的常见类型数据存储redis中的项目应用 1.java连接上redis package com.zlj.ssm.redis;import redis.clients.jedis.Jedis;/*** author zlj* create 2023-11-03 19:27*/ public class Demo1 {public static void main(String[] args) { // …

「Verilog学习笔记」移位运算与乘法

专栏前言 本专栏的内容主要是记录本人学习Verilog过程中的一些知识点,刷题网站用的是牛客网 分析 1、在硬件中进行乘除法运算是比较消耗资源的一种方法,想要在不影响延迟并尽量减少资源消耗,必须从硬件的特点上进行设计。根据寄存器的原理&a…

宏转录组分析揭示不同土壤生境中氮循环基因的表达

发表期刊:msystems 发表时间:2023 影响因子:6.4 DOI: 10.1128/msystems.00315-23 01、研究背景 与空白土壤相比,植物根系和根际细菌之间的相互作用调节了氮(N)的循环过程,并创造了富含低分…

最近非常火的电子木鱼流量主小程序源码系统 带完整搭建教程

在当今数字化时代,人们对于休闲娱乐的需求越来越高。近年来,一种结合了传统文化和现代科技的新型休闲娱乐方式——电子木鱼,迅速在年轻人群中流行开来。电子木鱼流量主小程序源码系统的出现,为这种新型娱乐方式提供了更加便捷的途…

吴恩达《机器学习》4-1->4-5:多变量线性回归

一、引入多维特征 在多维特征中,我们考虑的不再是单一的特征,而是一组特征,例如房价模型中可能包括房间数、楼层等多个特征。这些特征将组成一个向量,表示为(𝑥₁, 𝑥₂, . . . , 𝑥ₙ)&#x…

记一次pdjs时安装glob出现,npm ERR! code ETARGET和npm ERR! code ELIFECYCLE

如往常一样,我使用pdjs来编译proto文件,但出现了以下报错: 大致就是pdjs的util在尝试执行npm install glob^7.2.1 escodegen^1.13.0时出错了 尝试手动执行安装,escodegen被正确安装,但glob^7.2.1出错 npm ERR! code E…

陕西某小型水库雨水情测报及大坝安全监测项目案例

项目背景 根据《陕西省小型病险水库除险加固项目管理办法》、《陕西省小型水库雨水情测报和大坝安全监测设施建设与运行管理办法》的要求,为保障水库安全运行,对全省小型病险水库除险加固,建设完善雨水情测报、监测预警、防汛道路、通讯设备、…

安装anaconda时控制台conda-version报错

今天根据站内的一篇博客教程博客在此安装anaconda时&#xff0c;检查conda版本时报错如下&#xff1a; >>>>>>>>>>>> ERROR REPORT <<<<<<<<<<<< Traceback (most recent call last): File “D:\An…

办公套件全家桶 Office2019 mac中文版新功能

office 2019 mac是 Microsoft office 应用程序套件的最新版本。它包括流行的软件&#xff0c;例如 Microsoft Word、Excel、PowerPoint 和 Outlook&#xff0c;office 2019 比其前身有许多新功能和改进&#xff0c;包括增强的协作工具、与 OneDrive 和 SharePoint 等云服务的更…

MTK 拨打紧急电话接通时间过长问题分析

1、问题分析 从Log视频来看&#xff0c;通话接通时间过长&#xff0c;但是Modem Log来看&#xff0c;进行多两次拨号。 查看AP代码确实进行了两次拨号 AP界面查看确实只有一路通话 查看MTK原始代码&#xff0c;发现当紧急拨号失败后&#xff0c;上层换卡重试&#xff0c;界面不…

ActiveMq学习⑨__基于zookeeper和LevelDB搭建ActiveMQ集群

引入消息中间件后如何保证其高可用&#xff1f; 基于zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能&#xff0c;避免单点故障。 http://activemq.apache.org/masterslave LevelDB&#xff0c;5.6版本之后推出了LecelDB的持久化引擎&#xff0c;它使…

【Windows-软件-FFmpeg】(01)通过CMD运行FFmpeg进行操作,快速上手

前言 通过"cmd"运行"ffmpeg"进行操作&#xff0c;快速上手&#xff1b; 实操 【实操一】 说明 使用"ffmpeg"来合并音频文件和视频文件 &#xff1b; 环境 Windows 11 专业版&#xff08;22621.2428&#xff09;&#xff1b; 代码 &#xf…

【入门Flink】- 04Flink部署模式和运行模式【偏概念】

部署模式 在一些应用场景中&#xff0c;对于集群资源分配和占用的方式&#xff0c;可能会有特定的需求。Flink为各种场景提供了不同的部署模式&#xff0c;主要有以下三种&#xff1a;会话模式&#xff08;Session Mode&#xff09;、单作业模式&#xff08;Per-Job Mode&…