SparkSQL编程入口和模型与SparkSQL基本编程

SparkSQL编程入口和模型

SparkSQL编程模型

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

1)SQL:SQL不用多说,就和Hive操作一样,但是需要清楚一点的是,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。同时支持,通用sql和hivesql。

2)DSL(DataFrame&DataSet):在支持SQL编程的同时,方便大家使用函数式编程的思想,类似sparkcore的编程模式,sparksql也支持DSL(Domain Specified Language,领域专用语言,或者特定领域语言),即通过DataFrame和Dataset来支持类似RDD的编程。

DataFrame和Dataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。

Dataset是在spark1.6.2开始出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。

一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。

RDD V.S. DataFrame V.S. Dataset

RDD

弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法。从字面上就能看出的几个特点:

1)弹性:数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换。RDD出错后可自动重新计算(通过血缘自动容错)。可checkpoint(设置检查点,用于容错),可persist或cache(缓存),里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整。

2)分布式:RDD中的数据可存放在多个节点上。

3)数据集:即数据的集合,相对于DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)调整。

DataFrame

理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)。

假设RDD中的两行数据长这样,如图-5所示。

图-5 rdd数据

那么DataFrame中的数据长这样,如图-6所示。

图-6 dataframe数据

从上面两个图可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...)。

有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。

不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

Dataset:相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束,下图-7是官网对于dataset的表述。

图-7 dataset

假设RDD中的两行数据如同-5所示,那么Dataset中的数据长这样,如图-8所示。

图-8 dataset数据

或者也可以如图-9所示,其中每行数据是个Object。

图-9 dataset数据

使用Dataset API的程序,会经过Spark SQL的优化器进行优化(优化器叫什么还记得吗?)

目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是不方便,这也是引入Dataset的一个重要原因。

使用DataFrame的代码中json文件中并没有score字段,但是能编译通过,但是运行时会报异常!如图-10代码所示。

图-10 dataframe编码

而使用Dataset实现,会在IDE中就报错,出错提前到了编译之前,如下图-11所示。

图-11 dataset编码

SparkSession

在SparkSQL中的编程模型,不再是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。

SparkSQL基本编程

SparkSQL编程初体验

1)SparkSession的构建:

val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")//.enableHiveSupport()//支持hive的相关操作.getOrCreate()

2)基本编程:

object SparkSQLOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")//.enableHiveSupport()//支持hive的相关操作.getOrCreate()//加载数据val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")//二维表结构pdf.printSchema()//数据内容 select * from tblpdf.show()//具体的查询 select name, age from tblpdf.select("name", "age").show()//导入sparksession中的隐式转换操作,增强sql的功能import spark.implicits._pdf.select($"name",$"age").show()//列的运算,给每个人的年龄+10 select name, age+10,height-1 from tblpdf.select($"name",$"height" - 1, new Column("age").+(10)).show()//起别名select name, age+10 as age,height-1  as height from tblpdf.select($"name",($"height" -1).as("height")).show()//做聚合统计 统计不同年龄的人数select age, count(1) counts from tbl group by agepdf.select($"age").groupBy($"age").count().show()//条件查询 获取年龄超过18的用户//pdf.select("name", "age", "height").where($"age".>(18)).show()pdf.select("name", "age", "height").where("age > 18").show()//sql风格//pdf.registerTempTable()//在spark2.0之后处于维护状态,使用createOrReplaceTempView/*从使用范围上说,分为global和非globalglobal是当前SparkApplication中可用,非global只在当前SparkSession中可用从创建的角度上说,分为createOrReplace和不ReplacecreateOrReplace会覆盖之前的数据create不Replace,如果视图存在,会报错*/pdf.createOrReplaceTempView("people")spark.sql("""|select| age,| count(1) as countz|from people|group by age""".stripMargin).showspark.stop()}}

SparkSQL编程模型的操作

DataFrame的构建方式

在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口,创建DataFrames有三种方式,一种是可以从一个存在的RDD进行转换,还可以从Hive Table进行查询返回,或者通过Spark的数据源进行创建。

从Spark数据源进行创建:

package chapter1
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
object Create_DataFrame {def main(args: Array[String]): Unit = {//创建程序入口val spark = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//从数据源创建DataFrameval personDF = spark.read.json("resources/people.json")//展示数据personDF.show()}
}

从RDD进行转换:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
object Create_DataFrame1 {def main(args: Array[String]): Unit = {//创建程序入口val spark= SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val file: RDD[String] = sc.textFile("E:\\资料\\data\\person.txt")//按照分隔符进行切分val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))//指定字段类型val personRDD: RDD[(Int, String, Int)] = spliFile.map(line=>(line(0).toInt,line(1),line(2).toInt))//调用toDF方法指定列名val personDF: DataFrame = personRDD.toDF("id","name","age")//展示数据personDF.show()//释放资源spark.stop()sc.stop()}
}

通过反射创建DataFrame:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
case class person(id:Int,name:String,age:Int)
object createDataFrame2 {def main(args: Array[String]): Unit = {//创建程序入口val spark = SparkSession.builder().appName("createDF").master("local[*]").getOrCreate()//调用sparkContextval sc: SparkContext = spark.sparkContext//设置控制台日志输出级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val file: RDD[String] = sc.textFile("E:\\资料\\data\\person.txt")//按照分隔符进行切分val spliFile: RDD[Array[String]] = file.map(line=>line.split(" "))//指定字段类型val personRDD: RDD[person] = spliFile.map(line=>person(line(0).toInt,line(1),line(2).toInt))//调用toDF方法指定列名val personDF: DataFrame = personRDD.toDF()//展示数据personDF.show()//释放资源spark.stop()sc.stop()}
}

动态编程:

/*使用动态编程的方式构建DataFrameRow-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象*/val row:RDD[Row] = spark.sparkContext.parallelize(List(Row(1, "李伟", 1, 180.0),Row(2, "汪松伟", 2, 179.0),Row(3, "常洪浩", 1, 183.0),Row(4, "麻宁娜", 0, 168.0)))//表对应的元数据信息val schema = StructType(List(StructField("id", DataTypes.IntegerType, false),StructField("name", DataTypes.StringType, false),StructField("gender", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)))val df = spark.createDataFrame(row, schema)df.printSchema()df.show()

说明,这里学习三个新的类:

1)Row:代表的是二维表中的一行记录,或者就是一个Java对象。

2)StructType:是该二维表的元数据信息,是StructField的集合。

3)StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)。

Dataset的构建方式

Dataset是DataFrame的升级版,创建方式和DataFrame类似,但有不同。

//dataset的构建object SparkSQLDatasetOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkSQLDataset").master("local[*]").getOrCreate()//dataset的数据集val list = List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29))import spark.implicits._val ds = spark.createDataset[Student](list)ds.printSchema()ds.show()spark.stop()}}case class Student(id:Int, name:String, gender:Int, age:Int)

在编码中需要注意的是,如果导入spark.implicits隐式转换或者数据类型不是case class,便会出现如图-12所示的bug。

图-12 dataset编码注意的问题

在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过。

RDD和DataFrame以及DataSet的互相转换

RDD→DataFrame:

def beanRDD2DataFrame(spark:SparkSession): Unit = {val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29)))val sdf =spark.createDataFrame(stuRDD, classOf[Student])sdf.printSchema()sdf.show()}

RDD→Dataset:

Def rdd2Dataset(spark:SparkSession): Unit = {val stuRDD = spark.sparkContext.parallelize(List(Student(1, "王盛芃", 1, 19),Student(2, "李金宝", 1, 49),Student(3, "张海波", 1, 39),Student(4, "张文悦", 0, 29)))import spark.implicits._val ds:Dataset[Student] = spark.createDataset[Student](stuRDD)ds.show()}case class Student(id:Int, name:String, gender:Int, age:Int)

RDD转换为DataFrame和Dataset的时候可以有更加简单的方式,如下:

import spark.implicits._rdd.toDF()rdd.toDS()DataFrame→RDD:val rdd:RDD[Row] = df.rddrdd.foreach(row => {val id = row.getInt(0)val name = row.getString(1)val gender = row.getInt(2)val height = row.getAs[Double]("height")println(s"id=${id},name=$name,gender=$gender,height=$height")})

Dataset→RDD:

val stuDS: Dataset[Student] = list2Dataset(spark)val stuRDD:RDD[Student] = stuDS.rddstuRDD.foreach(println)Dataset→DataFrame:val stuDS: Dataset[Student] = list2Dataset(spark)      //dataset --->dataframeval df:DataFrame = stuDS.toDF()df.show()

DataFrame→Dataset:无法直接将DataFrame转化为Dataset,需要通过as方法添加泛型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/324301.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【北京迅为】《iTOP-3588开发板从零搭建ubuntu环境手册》-第2章 获取并安装Ubuntu操作系统

RK3588是一款低功耗、高性能的处理器,适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用,RK3588支持8K视频编解码,内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP&…

网络编程套接字 (二)---udosocket

本专栏内容为:Linux学习专栏,分为系统和网络两部分。 通过本专栏的深入学习,你可以了解并掌握Linux。 💓博主csdn个人主页:小小unicorn ⏩专栏分类:网络 🚚代码仓库:小小unicorn的代…

力扣每日一题-统计已测试设备-2024.5.10

力扣题目:统计已测试设备 题目链接: 2960.统计已测试设备 题目描述 代码思路 根据题目内容,第一感是根据题目模拟整个过程,在每一步中修改所有设备的电量百分比。但稍加思索,发现可以利用已测试设备的数量作为需要减少的设备电…

前端组件库图片上传时候做自定义裁剪操作

不论是vue还是react项目,我们在使用antd组件库做上传图片的时候,有一个上传图片裁剪的功能,但是这个功能默认是只支持1:1的裁剪操作,如何做到自定义的裁剪操作?比如显示宽高比?是否可以缩放和旋转操作&…

Leetcode—239. 滑动窗口最大值【困难】

2024每日刷题&#xff08;132&#xff09; Leetcode—239. 滑动窗口最大值 算法思想 用vector会超时的&#xff0c;用deque最好&#xff01; 实现代码 class Solution { public:vector<int> maxSlidingWindow(vector<int>& nums, int k) {deque<int> …

学习笔记:【QC】Android Q qmi扩展nvReadItem/nvWriteItem

一、qmi初始化 流程图 初始化流程: 1、主入口&#xff1a; vendor/qcom/proprietary/qcril-hal/qcrild/qcrild/rild.c int main(int argc, char **argv) { const RIL_RadioFunctions *(*rilInit)(const struct RIL_Env *, int, char **); rilInit RIL_Init; funcs rilInit…

react18【系列实用教程】JSX (2024最新版)

为什么要用 JSX&#xff1f; JSX 给 HTML 赋予了 JS 的编程能力 JSX 的本质 JSX 是 JavaScript 的语法扩展&#xff0c;浏览器本身不能识别&#xff0c;需要通过解析工具&#xff08;如babel&#xff09;解析之后才能在浏览器中运行。 bable 官网可以查看解析过程 JSX 的语法 …

【Java orm 框架比较】十 新增hammer_sql_db 框架对比

迁移到&#xff08;https://gitee.com/wujiawei1207537021/spring-orm-integration-compare&#xff09; orm框架使用性能比较 比较mybatis-plus、lazy、sqltoy、mybatis-flex、easy-query、mybatis-mp、jpa、dbvisitor、beetlsql、dream_orm、wood、hammer_sql_db 操作数据 …

【MySQL】SQL基本知识点DDL(1)

目录 1.SQL分类&#xff1a; 2.DDL-数据库操作 3.DDL-表操作-创建 4.DDL-表操作-查询 5.DDL-表操作-数据类型 6.DDL-表操作-修改 1.SQL分类&#xff1a; 2.DDL-数据库操作 3.DDL-表操作-创建 注意&#xff1a;里面的符号全部要切换为英文状态 4.DDL-表操作-查询 5.DDL…

Django开发实战之定制管理后台界面及知识梳理(中)

上一篇文章末尾讲到如何能够展示更多的字段在界面上&#xff0c;那么针对整个界面数据&#xff0c;如果我想按照某一个条件进行筛选&#xff0c;我该怎么做呢&#xff0c;只需要加上下面一行代码 注意&#xff1a;中途只有代码片段&#xff0c;文末有今天涉及的所有代码 1、增…

【Python可视化】pyecharts

Echarts 是一个由百度开源的数据可视化&#xff0c;凭借着良好的交互性&#xff0c;精巧的图表设计&#xff0c;得到了众多开发者的认可。而 Python 是一门富有表达力的语言&#xff0c;很适合用于数据处理。当数据分析遇上数据可视化时&#xff0c;pyecharts 诞生了。 需要安…

Lazada、Shopee测评自养号,快速出单技巧全解析!

每个人都憧憬着自己的店铺能够拥有一款或多款引人注目的热销商品&#xff0c;这些商品不仅能为店铺带来可观的收益&#xff0c;更重要的是它们能够成为吸引顾客的强大磁石&#xff0c;显著提升店铺的整体流量。一旦这样的爆款商品成功吸引顾客&#xff0c;其他产品也将随之受到…

PHP 框架安全:ThinkPHP 序列 漏洞测试.

什么是 ThinkPHP 框架. ThinkPHP 是一个流行的国内 PHP 框架&#xff0c;它提供了一套完整的安全措施来帮助开发者构建安全可靠的 web 应用程序。ThinkPHP 本身不断更新和改进&#xff0c;以应对新的安全威胁和漏洞。 目录&#xff1a; 什么是 ThinkPHP 框架. ThinkPHP 框架…

ASP.NET学生成绩管理系统

摘要 本系统依据开发要求主要应用于教育系统&#xff0c;完成对日常的教育工作中学生成绩档案的数字化管理。开发本系统可使学院教职员工减轻工作压力&#xff0c;比较系统地对教务、教学上的各项服务和信息进行管理&#xff0c;同时&#xff0c;可以减少劳动力的使用&#xf…

uniapp、web网页跨站数据交互及通讯

来来来&#xff0c;说说你的创作灵感&#xff01;这就跟吃饭睡觉一样&#xff0c;饿了就找吃的&#xff0c;渴了就倒水张口灌。 最近一个多月实在是忙的没再更新日志&#xff0c;好多粉丝私信说之前的创作于他们而言非常有用&#xff01;受益菲浅&#xff0c;这里非常感谢粉丝…

阿里云ECS服务器实例挂载数据盘步骤(磁盘自动挂载.、访问挂载点)

阿里云ECS服务器实例挂载数据盘步骤 1.磁盘自动挂载 首先登录阿里云ECS服务器&#xff0c;通过 df -h 命令查看当前磁盘挂载情况 通过 fdisk -l 命令查看磁盘情况&#xff0c;可以发现有两个盘&#xff1a; 系统盘 /dev/vda: 60GB&#xff0c; 数据盘 /dev/vdb: 500GB 使用…

反了!美国假冒邮政服务钓鱼网站访问量竟然超过正规官网

美国邮政是美国主要的包裹信件投递机构之一&#xff0c;长期以来该单位都是网络钓鱼和诈骗的针对目标。对美国公民来说&#xff0c;在假期通常都会收到声称来自美国邮政的诈骗。美国邮政甚至单独建设的网页提醒消费者警惕诈骗信息&#xff1a; 专用提醒网页 Akamai 的研究人员…

linux上用Jmter进行压测

在上一篇中安装好了Jmeter环境&#xff0c;在这一篇中将主要分享如何使用jmeter在linux中进行单机压测。 1.项目部署 在这里我们先简单部署一下测试环境&#xff0c;所用到的项目环境是个jar包&#xff0c;先在linux上home目录下新建app目录&#xff0c;然后通过rz命令将项目ja…

【c++】string深度刨析以及实现

#pragma once #include<iostream> using namespace std; #include<assert.h> namespace bite {class string{public://迭代器 //像指针 底层不一定是指针 typedef char* iterator;iterator begin(){return _str;}iterator end(){return _str _size;}//const 版本…

面试集中营—JVM篇

一、JVM内存模型 线程独占&#xff1a;栈&#xff0c;本地方法栈&#xff0c;程序计数器; 线程共享&#xff1a;堆&#xff0c;方法区 虚拟机栈&#xff1a;线程私有的&#xff0c;线程执行方法是会创建一个栈阵&#xff0c;用来存储局部变量表&#xff0c;操作栈&#xff0c;…