Spark SQL

目录

一、Spark SQL简介

(一)从Shark说起

(二)Spark SQL架构

(三)为什么推出Spark SQL

二、DataFrame概述

三、DataFrame的创建

四、DataFrame的保存

五、DataFrame的常用操作

六、从RDD转换得到DataFrame

(一)利用反射机制推断RDD模式

(二)使用编程方式定义RDD模式

七、使用Spark SQL读写数据库

(一)准备工作

(二)读取MySQL数据库中的数据

(三)向MySQL数据库写入数据


一、Spark SQL简介

(一)从Shark说起

        Hive是一个基于Hadoop 的数据仓库工具,提供了类似于关系数据库SQL的查询语言HiveQL,用户可以通过HiveQL语句快速实现简单的MapReduce统计,Hive 自身可以自动将HiveQL语句快速转换成MapReduce 任务进行运行。当用户向Hive输入一段命令或查询(即HiveQL 语句)时, Hive需要与Hadoop交互来完成该操作。该命令或查询首先进入到驱动模块,由驱动模块中的编译器进行解析编译,并由优化器对该操作进行优化计算,然后交给执行器去执行,执行器通常的任务是启动一个或多个MapReduce任务。如图所示描述了用户提交一段SQL查询后,Hive把sQL 语句转化成MapReduce任务进行执行的详细过程。

        Shark即Hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作。

        Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高。

        Shark的设计导致了两个问题: 一是执行计划优化完全依赖于Hive,不方便添加新的优化策略
二是因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。

        2014年6月1日Shark项目和Spark SQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放在Spark SQL项目上,至此,Shark的发展画上了句号,但也因此发展出两个分支:Spark SQL和Hive on Spark。Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

(二)Spark SQL架构

        Spark SQL架构如图所示,Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责。

        Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据 Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。

(三)为什么推出Spark SQL

        

        关系数据库已经很流行 关系数据库在大数据时代已经不能满足要求 首先,用户需要从不同数据源执行各种操作,包括结构化、半结构化和非结构化数据 其次,用户需要执行高级分析,比如机器学习和图像处理 在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统。

        Spark SQL填补了这个鸿沟: 首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系型操作 其次,可以支持大数据中的大量数据源和数据分析算法 Spark SQL可以融合:传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力。

二、DataFrame概述

        Spark SQL所使用的数据抽象并非RDD,而是DataFrame。DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能 Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询。        

        RDD是分布式的 Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的 DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息。如图所示为RDD和DataFrame的区别。

三、DataFrame的创建

        从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。
        SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。

可以通过如下语句创建一个SparkSession对象:

>>> from pyspark import SparkContext,SparkConf
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

        实际上,在启动进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)。

        在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame。例如:
spark.read.text("people.txt"):读取文本文件people.txt创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径。
spark.read.json("people.json"):读取people.json文件创建DataFrame。
spark.read.parquet(“people.parquet”):读取people.parquet文件创建DataFrame。
或者也可以使用如下格式的语句:
spark.read.format("text").load("people.txt"):读取文本文件people.json创建DataFrame。 spark.read.format("json").load("people.json"):读取JSON文件people.json创建DataFrame。 spark.read.format("parquet").load("people.parquet"):读取Parquet文件people.parquet创建DataFrame。

一个实例:

在“/usr/local/spark/examples/src/main/resources/”这个目录下,这个目录下有两个样例数据people.json和people.txt。

people.json文件的内容如下:

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

people.txt文件的内容如下:

Michael, 29

Andy, 30

Justin, 19

>>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
>>> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

四、DataFrame的保存

        可以使用spark.write操作,把一个DataFrame保存成不同格式的文件,例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下:
df.write.text("people.txt")
df.write.json("people.json")
df.write.parquet("people.parquet")
或者也可以使用如下格式的语句:
df.write.format("text").save("people.txt")
df.write.format("json").save("people.json")
df.write.format ("parquet").save("people.parquet")

        下面从示例文件people.json中创建一个DataFrame,名称为peopleDF,把peopleDF保存到另外一个JSON文件中,然后,再从peopleDF中选取一个列(即name列),把该列数据保存到一个文本文件中。

>>> peopleDF = spark.read.format("json").\
... load("file:///usr/local/spark/examples/src/main/resources/people.json")
>>> peopleDF.select("name", "age").write.format("json").\
... save("file:///home/zhc/mycode/sparksql/newpeople.json")
>>> peopleDF.select("name").write.format("text").\
... save("file:///home/zhc/mycode/sparksql/newpeople.txt")

然后到“/home/zhc/mycode/sparksql/”路径下可以看到生成一个名称为newpeople.json的目录(不是文件)和一个名称为newpeople.txt的目录(不是文件)。

五、DataFrame的常用操作

可以执行一些常用的DataFrame操作,先创建一个DataFrame:

>>> df=spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")

1、printSchema()

>>> df.printSchema()

2、select()

>>> df.select(df["name"],df["age"]+1).show()

3、filter()

>>> df.filter(df["age"]>20).show()

4、groupBy()

>>> df.groupBy("age").count().show()

5、sort()

>>> df.sort(df["age"].desc()).show()

六、从RDD转换得到DataFrame

(一)利用反射机制推断RDD模式

        利用反射机制来推断包含特定类型对象的RDD的模式(Schema),适用于数据结构已知时的RDD转换。

        在“/usr/local/spark/examples/src/main/resources/”目录下,有个Spark安装时自带的样例数据people.txt,其内容如下:

Michael, 29

Andy, 30

Justin, 19

        现在要把people.txt加载到内存中生成一个DataFrame,并查询其中的数据。执行代码如下:

>>> from pyspark.sql import Row
>>> people = spark.sparkContext.\
... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").\
... map(lambda line: line.split(",")).\
... map(lambda p: Row(name=p[0], age=int(p[1])))
>>> schemaPeople = spark.createDataFrame(people)
#必须注册为临时表才能供下面的查询使用
>>> schemaPeople.createOrReplaceTempView("people") 
>>> personsDF = spark.sql("select name,age from people where age > 20")
#DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值
>>> personsRDD=personsDF.rdd.map(lambda p:"Name: "+p.name+ ","+"Age: "+str(p.age))
>>> personsRDD.foreach(print)
Name: Michael,Age: 29
Name: Andy,Age: 30

(二)使用编程方式定义RDD模式

        使用编程接口构造一个模式(Schema),并将其应用在已知的RDD上,适用于数据结构未知的RDD转换。

        当无法提前获知数据结构时,就需要采用编程方式定义RDD模式。比如,现在需要通过编程方式把“/usr/local/spark/examples/src/main/resources/people.txt”加载进来生成DataFrame,并完成SQL查询。步骤如下:

        下面是利用Spark SQL查询people.txt的完整代码:

>>> from pyspark.sql.types import *
>>> from pyspark.sql import Row
#下面生成“表头”
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
>>> schema = StructType(fields)
#下面生成“表中的记录”
>>> lines = spark.sparkContext.\
... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(p[0], p[1].strip()))
#下面把“表头”和“表中的记录”拼装在一起
>>> schemaPeople = spark.createDataFrame(people, schema)
#注册一个临时表供下面查询使用
>>> schemaPeople.createOrReplaceTempView("people")
>>> results = spark.sql("SELECT name,age FROM people")
>>> results.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

七、使用Spark SQL读写数据库

        Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源。

(一)准备工作

        在Linux系统中安装MySQL数据库的方法,可以参照我上一篇博客。

在Linux系统中安装MySQL数据库-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/Morse_Chen/article/details/135154114        安装成功后,在Linux终端启动MySQL数据库,命令如下:

[root@bigdata zhc]# systemctl start mysqld.service
[root@bigdata zhc]# mysql -u root -p       #屏幕会提示你输入密码

输入下面SQL语句完成数据库和表的创建:

mysql> create database spark;mysql> use spark;mysql> create table student (id int(4), name char(20), gender char(4), age int(4));mysql> insert into student values(1,'Xueqian','F',23);mysql> insert into student values(2,'Weiliang','M',24);mysql> select * from student;

        为了让Spark能够顺利连接MySQL数据库,还需要MySQL数据库驱动程序。可以上网查找下载MySQL的JDBC驱动程序。下载MySQL的JDBC驱动程序,比如mysql-connector-java-5.1.40.tar.gz 。把该驱动程序解压出mysql-connector-java-5.1.40-bin.jar文件,并将其拷贝到spark的安装目录“/usr/local/spark/jars”下。完成以上操作后,再启动进入pyspark。

(二)读取MySQL数据库中的数据

        启动进入pyspark后,执行以下命令连接数据库,读取数据,并显示:

>>> jdbcDF = spark.read.format("jdbc") \.option("driver","com.mysql.jdbc.Driver") \.option("url", "jdbc:mysql://localhost:3306/spark") \.option("dbtable", "student") \.option("user", "root") \.option("password", "123456") \.load()
>>> jdbcDF.show()
+---+--------+------+---+
| id| name|gender|age|
+---+--------+------+---+
| 1| Xueqian| F| 23|
| 2|Weiliang| M| 24|
+---+--------+------+---+

(三)向MySQL数据库写入数据

        在MySQL数据库中已经创建了一个名称为spark的数据库,并创建了一个名称为student的表 创建后,查看一下数据库内容:

        现在开始编写程序,创建一个“/home/zhc/mycode/sparksql/InsertStudent.py”,往spark.student表中插入两条记录。

#/home/zhc/mycode/sparksql/InsertStudent.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age", IntegerType(), True)])
#下面设置两条数据,表示两个学生的信息
studentRDD = spark.sparkContext.parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]).map(lambda x:x.split(" "))#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
studentDF = spark.createDataFrame(rowRDD, schema)#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark?useSSL=false",'student','append', prop)

通过spark-submit提交上述代码文件:

[root@bigdata sparksql]# spark-submit InsertStudent.py 

        执行上述代码后,可以看一下效果,在MySQL Shell环境中使用SQL查询spark.student表发生了什么变化。

另外,解决一下在运行上述代码时,可能出现的问题:

很显然,上图中运行代码时抛出了异常。

这是因为与MySQL数据库的SSL连接失败了,我们只需要将数据源的URL后面添加**?useSSL=false**就可以解决,也就是禁用SSL:

再次运行代码,就OK了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/227427.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs+vue+ElementUi农产品团购销售系统zto2c

目标是为了完成小区团购平台的设计和实现,在疫情当下的环境,方便小区业主购入生活所需,减小居民的生活压力 采用B/S模式架构系统,开发简单,只需要连接网络即可登录本系统,不需要安装任何客户端。开发工具采…

mxxWechatBot微信机器人V2版本文档说明

大家伙,我是雄雄,欢迎关注微信公众号:雄雄的小课堂。 先看这里 一、前言二、mxxWechatBot流程图三、怎么使用? 一、前言 经过不断地探索与研究,mxxWechatBot正式上线,届时全面开放使用。 mxxWechatBot&am…

LLM之RAG实战(十一)| 使用Mistral-7B和Langchain搭建基于PDF文件的聊天机器人

在本文中,使用LangChain、HuggingFaceEmbeddings和HuggingFace的Mistral-7B LLM创建一个简单的Python程序,可以从任何pdf文件中回答问题。 一、LangChain简介 LangChain是一个在语言模型之上开发上下文感知应用程序的框架。LangChain使用带prompt和few-…

小米电脑管家 - 手机平板电脑家居互联

系列文章目录 前言 联想电脑安装小米电脑管家实现设备互联 如图,将 小米平板 5 Pro 作为联想笔记本 GeekPro 5000 (这垃圾电脑)的副屏。 可以在小米平板控制笔记本,如图所示 一、官方使用手册 参考:小米电脑管家帮助 …

听GPT 讲Rust源代码--src/tools(34)

File: rust/src/tools/clippy/clippy_lints/src/collection_is_never_read.rs 文件"collection_is_never_read.rs"位于Rust源代码中的clippy_lints工具中,其作用是检查在集合类型(如Vec、HashMap等)的实例上执行的操作是否被忽略了…

学习笔记13——Spring整合Mybatis、junit、AOP、事务

学习笔记系列开头惯例发布一些寻亲消息 链接:https://baobeihuijia.com/bbhj/ Mybatis - Spring(使用第三方包new一个对象bean) 原始的Mybatis与数据库交互【通过sqlmapconfig来配置和连接】 初始化SqlSessionFactory获得连接获取数据层接口…

Jmeter吞吐量控制器总结

吞吐量控制器(Throughput Controller) 场景: 在同一个线程组里, 有10个并发, 7个做A业务, 3个做B业务,要模拟这种场景,可以通过吞吐量模拟器来实现。 添加吞吐量控制器 用法1: Percent Executions 在一个线程组内分别建立两个吞吐量控制器, 分别放业务A和业务B …

【三维目标检测/自动驾驶】IA-BEV:基于结构先验和自增强学习的实例感知三维目标检测(AAAI 2024)

系列文章目录 论文:Instance-aware Multi-Camera 3D Object Detection with Structural Priors Mining and Self-Boosting Learning 地址:https://arxiv.org/pdf/2312.08004.pdf 来源:复旦大学 英特尔Shanghai Key Lab /美团 文章目录 系列文…

数据预处理时,怎样处理类别型特征?

1. 序号编码 序号编码通常用于处理类别间具有大小关系的数据。例如成绩,可以分为低、中、高三档,并且存在“高>中>低”的排序关系。序号编码会按照大小关系对类别型特征赋予一个数值ID,例如高表示为3、中表示为2、低表示为1&#xff0…

Spring系列学习四、Spring数据访问

Spring数据访问 一、Spring中的JDBC模板介绍1、新建SpringBoot应用2、引入依赖:3、配置数据库连接,注入dbcTemplate对象,执行查询:4,测试验证: 二、整合MyBatis Plus1,在你的项目中添加MyBatis …

elasticsearch系列三:常用查询语法

概述 前几篇我们介绍了如何在es中存储数据,如何更加合理的存储数据,今天我们来说下常用的查询语法,如何实现mysql中的等于、大于、小于、and 、or、in等方式。 案例 我们以kibana为例,比如sql中的等于,在es中可以用…

【1】Docker详解与部署微服务实战

Docker 详解 Docker 简介 Docker 是一个开源的容器化平台,可以帮助开发者将应用程序和其依赖的环境打包成一个可移植、可部署的容器。Docker 的主要目标是通过容器化技术实现应用程序的快速部署、可移植性和可扩展性,从而简化应用程序的开发、测试和部…

计算机组成原理之BCD码和奇偶校验码小白秒懂

BCD码简介 原文文档下载https://download.csdn.net/download/m0_46579394/88681870 BCD码也称二进码十进数,BCD码可分为有权码和无权码两类。其中,常见的有权BCD码有8421码、2421码、5421码,无权BCD码有余3码、余3循环码、格雷码。8421BCD码…

数据分析硬核工具Origin各版本安装指南

下载链接 https://pan.baidu.com/s/12mENFtRFdNaLzVKmE6w_Uw?pwd0531 1.鼠标右击【Origin 2022(64bit)】压缩包(win11及以上系统需先点击显示更多“选项”)选择【解压到 Origin 2022(64bit)】。 2.双击打开解压后的【Origin 2022(64bit)】文件夹。 3.…

最新AI系统ChatGPT网站H5系统源码,支持AI绘画,GPT语音对话+ChatFile文档对话总结+DALL-E3文生图

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统,支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美,可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

【基础篇】七、线程上下文类加载器打破双亲委派机制

文章目录 1、SPI机制2、JDBC案例之SPI机制3、打破双亲委派机制:线程上下文类加载器4、打破双亲委派机制:osgi模块化5、JDK9之后的类加载器6、小总结 1、SPI机制 SPI,Service Provider Interface,是JDK内置的一种服务提供发现机制…

NModbus-一个C#的Modbus协议库实现

NModbus-一个基于C#实现的Modbus通信协议库 最近在学习C#的时候,因为之前做过环保设备时使用C做过环保设备采集使用到了Modbus协议,当时看了一下基于C语言开发的libmodbus库。所以特意搜索看了一下C#下有什么Modbus协议库,在Github上面找了一…

Strateg策略模式(组件协作)

策略模式(组件协作) 链接:策略模式实例代码 注解 目的 正常情况下,一个类/对象中会包含其所有可能会使用的内外方法,但是一般情况下,这些常使用的类都是由不同的父类继承、组合得来的,来实现…

Feign远程调用

Feign远程调用 Fegin的使用步骤如下&#xff1a; 1&#xff09;引入依赖 我们在order-service服务的pom文件中引入feign的依赖&#xff1a; <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign&…

SadTalker数字人增加视频输出mp4质量精度

最近在用数字人简易方案&#xff0c;看到了sadtalker虽然效果差&#xff0c;但是可以作为一个快速方案&#xff0c;没有安装sd的版本&#xff0c;随便找了个一键安装包 设置如上 使用倒是非常简单&#xff0c;但是出现一个问题&#xff0c;就是输出的mp4都出马赛克了 界面上却…