SparkSQL的编程模型(DataFrame和DataSet)

1.2 SparkSQL的编程模型(DataFrame和DataSet)

1.2.1  编程模型简介

主要通过两种方式操作SparkSQL,一种就是SQL,另一种为DataFrame和Dataset。

  • SQL SQL不用多说,就和Hive操作一样,但是需要清楚一点的时候,SQL操作的是表,所以要想用SQL进行操作,就需要将SparkSQL对应的编程模型转化成为一张表才可以。 同时支持,通用SQL和HQL。

  • DataFrame和Dataset DataFrameDataset是SparkSQL中的编程模型。DataFrame和Dataset我们都可以理解为是一张mysql中的二维表,表有什么?表头,表名,字段,字段类型。RDD其实说白了也是一张二维表,但是这张二维表相比较于DataFrame和Dataset却少了很多东西,比如表头,表名,字段,字段类型,只有数据。 Dataset是在spark1.6.2开始出现出现的api,DataFrame是1.3的时候出现的,早期的时候DataFrame叫SchemaRDD,SchemaRDD和SparkCore中的RDD相比较,就多了Schema,所谓约束信息,元数据信息。 一般的,将RDD称之为Spark体系中的第一代编程模型;DataFrame比RDD多了一个Schema元数据信息,被称之为Spark体系中的第二代编程模型;Dataset吸收了RDD的优点(强类型推断和强大的函数式编程)和DataFrame中的优化(SQL优化引擎,内存列存储),成为Spark的最新一代的编程模型。

1.2.2 RDD\DataSet\DataFrame

RDD

弹性分布式数据集,是Spark对数据进行的一种抽象,可以理解为Spark对数据的一种组织方式,更简单些说,RDD就是一种数据结构,里面包含了数据和操作数据的方法

从字面上就能看出的几个特点:

  • 弹性:

    数据可完全放内存或完全放磁盘,也可部分存放在内存,部分存放在磁盘,并可以自动切换

    RDD出错后可自动重新计算(通过血缘自动容错)

    可checkpoint(设置检查点,用于容错),可persist或cache(缓存)

    里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整

  • 分布式:

    RDD中的数据可存放在多个节点上

  • 数据集:

    数据的集合,没啥好说的

相对于与DataFrame和Dataset,RDD是Spark最底层的抽象,目前是开发者用的最多的,但逐步会转向DataFrame和Dataset(当然,这是Spark的发展趋势)

DataFrame

DataFrame:理解了RDD,DataFrame就容易理解些,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)

假设RDD中的两行数据长这样;

1张三20
2李四21
3王五22

那么在DataFrame中数据就变成这样;

ID:IntName:StringAge:Int
1张三20
2李四21
3王五22

从上面两个表格可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了,DataFrame还配套了新的操作数据的方法等,有了DataFrame这个高一层的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了,对开发者来说,易用性有了很大的提升。,不仅如此,通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不高效,也可以运行的很快。

Dataset

相对于RDD,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束

假设RDD中的两行数据长这样;

1张三20
2李四21
3王五22

那么在DataFrame中数据就变成这样;

ID:IntName:StringAge:Int
1张三20
2李四21
3王五22

那么在DataSet中数据就变成这样;

Person(id:Int,Name:String,Age:Int)
Person(1,张三,20)
Person(2,李四,21)
Person(3,王五,22)
目前仅支持Scala、Java API,尚未提供Python的API(所以一定要学习Scala),相比DataFrame,Dataset提供了编译时类型检查,对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),到提交到集群运行时才发现错误,实在是想骂人,这也是引入Dataset的一个重要原因。
​
使用DataFrame的代码中json文件中并没有score字段,但是能编译通过,但是运行时会报异常!如下图代码所示.
val df1 = spark.read.json( "/tmp/people.json")
// json文件中没有score字段,但是能编译通过
val df2 = df1.filter("score > 60")df2.show()

而使用Dataset实现,会在IDE中就报错,出错提前到了编译之前

val ds1 = spark.read.json( "/tmp/people.json" ).as[ People]
// 使用dataset这样写,在IDE中就能发现错误
val ds2 = ds1.filter(_.score < 60)
val ds3 = ds1.filter(_.age < 18)
// 打印
ds3.show( )

总体来说DS这种方式更加合理,并且更加人性化,比较适合程序员的开发及使用,而且Spark在2.X版本以后也在推行开发者开发中使用DS进行开发。

1.2.3 SparkSQL的编程入口
在SparkSQL中的编程模型,不在是SparkContext,但是创建需要依赖SparkContext。SparkSQL中的编程模型,在spark2.0以前的版本中为SQLContext和HiveContext,HiveContext是SQLContext的一个子类,提供Hive中特有的一些功能,比如row_number开窗函数等等,这是SQLContext所不具备的,在Spark2.0之后将这两个进行了合并——SparkSession。SparkSession的构建需要依赖SparkConf或者SparkContext。使用工厂构建器(Builder方式)模式创建SparkSession。
1.2.4 SparkSQL基本编程

创建SparkSQL的模块

创建工程省略,直接在原有工程引入Pom即可

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version>
</dependency>
1.2.5 SparkSQL编程初体验
  • SparkSession的构建

val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")
//.enableHiveSupport()//支持hive的相关操作.getOrCreate()
  • 基本编程

object _01SparkSQLOps {def main(args: Array[String]): Unit = {
​val spark = SparkSession.builder().appName("SparkSQLOps").master("local[*]")
//                .enableHiveSupport()//支持hive的相关操作.getOrCreate()//加载数据val pdf:DataFrame = spark.read.json("file:///E:/data/spark/sql/people.json")//二维表结构pdf.printSchema()//数据内容 select * from tblpdf.show()//具体的查询 select name, age from tblpdf.select("name", "age").show()import spark.implicits._//导入sparksession中的隐式转换操作,增强sql的功能pdf.select($"name",$"age").show()//列的运算,给每个人的年龄+10 select name, age+10,height-1 from tblpdf.select($"name",$"height" - 1, new Column("age").+(10)).show()//起别名  select name, age+10 as age,height-1  as height from tblpdf.select($"name",($"height" - 1).as("height"), new Column("age").+(10).as("age")).show()//做聚合统计 统计不同年龄的人数 select age, count(1) counts from tbl group by agepdf.select($"age").groupBy($"age").count().show()//条件查询 获取年龄超过18的用户  select * from tbl where age > 18// pdf.select("name", "age", "height").where($"age".>(18)).show()pdf.select("name", "age", "height").where("age > 18").show()//sql//pdf.registerTempTable()//在spark2.0之后处于维护状态,使用createOrReplaceTempView/*从使用范围上说,分为global和非globalglobal是当前SparkApplication中可用,非global只在当前SparkSession中可用从创建的角度上说,分为createOrReplace和不ReplacecreateOrReplace会覆盖之前的数据create不Replace,如果视图存在,会报错*/pdf.createOrReplaceTempView("people")// 使用SQL语法进行处理spark.sql("""|select| age,| count(1) as countz|from people|group by age""".stripMargin).show// 打印输出spark.stop()}
}
1.2.6 SparkSQL编程模型的操作

DataFrame的构建方式

构建方式有两,一种通过Javabean+反射的方式来进行构建;还有一种的话通过动态编码的方式来构建。

  • JavaBean+反射

import org.apache.spark.sql.{DataFrame, SparkSession}
​
/*** 创建DataFrame方式* 反射方式*/
object _02SparkSQLCreateDFOps {def main(args: Array[String]): Unit = {// 创建执行入口val spark = SparkSession.builder().appName("createDF").master("local").getOrCreate()// 创建集合数据,并将数据封装到样例类中val list = List(student(1,"王凯",0,23),student(2,"赵凯",0,32),student(3,"姜华劲",1,24))// 导入隐式转换import spark.implicits._// 创建DF,创建DF的同时可以进行字段名重命名val df: DataFrame = list.toDF("ids","names","genders","ages")// 打印输出df.printSchema()df.show()// 关闭spark.stop()}
}
// 构建反射方式的样例类
case class student(id:Int,name:String,gender:Int,age:Int)
  • 动态编程

object _02SparkSQLDataFrameOps {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().master("local[*]").appName("SparkSQLDataFrame").getOrCreate()
/*使用动态编程的方式构建DataFrameRow-->行,就代表了二维表中的一行记录,jdbc中的resultset,就是java中的一个对象*/
val row:RDD[Row] = spark.sparkContext.parallelize(List(Row(1, "李伟", 1, 180.0),Row(2, "汪松伟", 2, 179.0),Row(3, "常洪浩", 1, 183.0),Row(4, "麻宁娜", 0, 168.0)
))
//表对应的元数据信息
val schema = StructType(List(StructField("id", DataTypes.IntegerType, false),StructField("name", DataTypes.StringType, false),StructField("gender", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)
))
​
val df = spark.createDataFrame(row, schema)
df.printSchema()
df.show()}
}

说明:这里学习三个新的类:

Row:代表的是二维表中的一行记录,或者就是一个Java对象

StructType:是该二维表的元数据信息,是StructField的集合

StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)

  • 总结: 这两种方式,都是非常常用,但是动态编程更加的灵活,因为javabean的方式的话,提前要确定好数据格式类型,后期无法做改动。

Dataset的构建方式

Dataset是DataFrame的升级版,创建方式和DataFrame类似,但有不同。

//dataset的构建
object _03SparkSQLDatasetOps {def main(args: Array[String]): Unit = {
​val spark = SparkSession.builder().appName("SparkSQLDataset").master("local[*]").getOrCreate()
​//dataset的构建val list = List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29))import spark.implicits._val ds = spark.createDataset[Student](list)ds.printSchema()ds.show()spark.stop()}
}
case class Student(id:Int, name:String, gender:Int, age:Int)

注意:出现如下错误

在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过

RDD和DataFrame以及DataSet的互相转换

RDD--->DataFrame

    def beanRDD2DataFrame(spark:SparkSession): Unit = {val stuRDD:RDD[Student] = spark.sparkContext.parallelize(List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29)))val sdf =spark.createDataFrame(stuRDD, classOf[Student])sdf.printSchema()sdf.show()}

RDD--->Dataset

def  rdd2Dataset(spark:SparkSession): Unit = {val stuRDD = spark.sparkContext.parallelize(List(Student(1, "王盛芃", 1, 19),Student(2, "李金宝", 1, 49),Student(3, "张海波", 1, 39),Student(4, "张文悦", 0, 29)))import spark.implicits._val ds:Dataset[Student] = spark.createDataset(stuRDD)
​ds.show()
}   
case class Student(id:Int, name:String, gender:Int, age:Int)

在RDD转换为DataFrame和Dataset的时候可以有更加简单的方式

import spark.implicits._
rdd.toDF()
rdd.toDS()

DataFrame--->RDD

val rdd:RDD[Row] = df.rdd
rdd.foreach(row => {//            println(row)val id = row.getInt(0)val name = row.getString(1)val gender = row.getInt(2)val height = row.getAs[Double]("height")println(s"id=${id},name=$name,gender=$gender,height=$height")
})

DataFrame--->Dataset

无法直接将DataFrame转化为Dataset

Dataset --->RDD

val stuDS: Dataset[Student] = list2Dataset(spark)//dataset --> rdd
val stuRDD:RDD[Student] = stuDS.rdd
stuRDD.foreach(println)

Dataset--->DataFrame

val stuDS: Dataset[Student] = list2Dataset(spark)      
//dataset --->dataframe
val df:DataFrame = stuDS.toDF()
df.show()

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/222024.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【科技前沿】数字孪生技术改革智慧供热,换热站3D可视化引领未来

换热站作为供热系统不可或缺的一部分&#xff0c;其能源消耗对城市环保至关重要。在双碳目标下&#xff0c;供热企业可通过搭建智慧供热系统&#xff0c;实现供热方式的低碳、高效、智能化&#xff0c;从而减少碳排放和能源浪费。通过应用物联网、大数据等高新技术&#xff0c;…

数据库操作习题12.12

考虑如下的人员数据&#xff0c;其中加下划线的是主码&#xff0c;数据库模式由四个关系组成: employee (empname, street, city) works (empname, compname, salary) company(id, compname, city) managers (empname, mgrname) 其中 关系 employee 给出人员的基本信息,包括人员…

【hadoop】解决浏览器不能访问Hadoop的50070、8088等端口?!

【hadoop】解决浏览器不能访问Hadoop的50070、8088等端口&#xff1f;&#xff01;&#x1f60e; 前言&#x1f64c;【hadoop】解决浏览器不能访问Hadoop的50070、8088等端口&#xff1f;&#xff01;查看自己的配置文件&#xff1a;最终成功访问如图所示&#xff1a; 总结撒花…

2023年度佳作:AIGC、AGI、GhatGPT 与人工智能大模型的创新与前景展望

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《linux深造日志》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 写在前面参与规则 ✅参与方式&#xff1a;关注博主、点赞、收藏、评论&#xff0c;任意评论&#xff08;每人最多评论…

Unity | Shader基础知识(第八集:案例<漫反射材质球>)

目录 一、本节介绍 1 上集回顾 2 本节介绍 二、什么是漫反射材质球 三、 漫反射进化史 1 三种算法结果的区别 2 具体算法 2.1 兰伯特逐顶点算法 a.本小节使用的unity自带结构体。 b.兰伯特逐顶点算法公式 c.代码实现——兰伯特逐顶点算法 2.2 代码实现——兰伯特逐…

基于MLP完成CIFAR-10数据集和UCI wine数据集的分类

基于MLP完成CIFAR-10数据集和UCI wine数据集的分类&#xff0c;使用到了sklearn和tensorflow&#xff0c;并对图片分类进行了数据可视化展示 数据集介绍 UCI wine数据集&#xff1a; http://archive.ics.uci.edu/dataset/109/wine 这些数据是对意大利同一地区种植的葡萄酒进…

Linux调试器gdb的用法

Linux调试器gdb的用法 1. debug/release版本之间的比较2. gdb调试器的基本指令3. 使用展示 1. debug/release版本之间的比较 在之前学习C语言的的时候出过一期vs的调试技巧。 而对于现在的Linux下的调试器gdb其实也是换汤不换药的&#xff0c;基本上的调试思路是不会改变的&am…

http -- 跨域问题详解(浏览器)

参考链接 参考链接 1. 跨域报错示例 Access to XMLHttpRequest at http://127.0.0.1:3000/ from origin http://localhost:3000 has been blocked by CORS policy: Response to preflight request doesnt pass access control check: No Access-Control-Allow-Origin header…

【Java 集合】LinkedBlockingDeque

在开始介绍 LinkedBlockingDeque 之前, 我们先看一下 LinkedBlockingDeque 的类图: 从其中可以看出他直接实现了 BlockingDeque 接口, 而 BlockingDeque 又实现了 BlockingQueue 的接口, 所以它本身具备了队列的特性。 而实现 BlockingDeque 使其在 BlockingQueue 的基础上多了…

Spring Boot自动装配原理以及实践

了解自动装配两个核心 Import注解的作用 Import说Spring框架经常会看到的注解&#xff0c;它有以下几个作用: 导入Configuration类下所有的bean方法中创建的bean。导入import指定的bean&#xff0c;例如Import(AService.class)&#xff0c;就会生成AService的bean&#xff0…

获取请求体中json数据并解析到实体对象

目录 相关依赖 前端代码 后端代码 测试结果 相关依赖 <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version> </dependency> <dependency><groupId>comm…

02 ModBus TCP

目录 一、ModBus TCP 一帧数据格式 二、0x01 读线圈状态 三、0x03读保持寄存器 四、0x05写单个线圈 五、0x06 写单个寄存器 六、0x0f写多个线圈 七、0x10&#xff1a;写多个保持寄存器 八、通信过程 九、不同modbus通信模式的应用场景 一、ModBus TCP 一帧数据格式 其…

fill-in-the-middle(FIM) 实现与简单应用

1 背景 传统训练的 GPT 模型只能根据前文内容预测后文内容&#xff0c;但有些应用比如代码生成器&#xff0c;需要我们给出上文和下文&#xff0c;使模型可以预测中间的内容&#xff0c;传统训练的 GPT 就不能完成这类任务。 传统训练的 GPT 只能根据上文预测下文 使用 FIM…

基于Pytest+Requests+Allure实现接口自动化测试

一、整体结构 框架组成&#xff1a;pytestrequestsallure 设计模式&#xff1a; 关键字驱动 项目结构&#xff1a; 工具层&#xff1a;api_keyword/ 参数层&#xff1a;params/ 用例层&#xff1a;case/ 数据驱动&#xff1a;data_driver/ 数据层&#xff1a;data/ 逻…

玩转大数据19:数据治理与元数据管理策略

随着大数据时代的到来&#xff0c;数据已经成为企业的重要资产。然而&#xff0c;如何有效地管理和利用这些数据&#xff0c;成为了一个亟待解决的问题。数据治理和元数据管理是解决这个问题的关键。 1.数据治理的概念和重要性 数据治理是指对数据进行全面、系统、规范的管理…

MLOps在极狐GitLab 的现状和前瞻

什么是 MLOps 首先我们可以这么定义机器学习&#xff08;Machine Learning&#xff09;&#xff1a;通过一组工具和算法&#xff0c;从给定数据集中提取信息以进行具有一定程度不确定性的预测&#xff0c;借助于这些预测增强用户体验或推动内部决策。 同一般的软件研发流程比…

行为型设计模式(一)模版方法模式 迭代器模式

模板方法模式 Template 1、什么是模版方法模式 模版方法模式定义了一个算法的骨架&#xff0c;它将其中一些步骤的实现推迟到子类里面&#xff0c;使得子类可以在不改变算法结构的情况下重新定义算法中的某些步骤。 2、为什么使用模版方法模式 封装不变部分&#xff1a;模版…

vscode配置node.js调试环境

node.js基于VSCode的开发环境的搭建非常简单。 说明&#xff1a;本文的前置条件是已安装好node.js(具体安装不再赘述&#xff0c;如有需要可评论区留言)。 阅读本文可掌握&#xff1a; 方便地进行js单步调试&#xff1b;方便地查看内置的对象或属性&#xff1b; 安装插件 C…

RouterSrv-DHCP

2023年全国网络系统管理赛项真题 模块B-Windows解析 题目 安装和配置DHCP relay服务,为办公区域网络提供地址上网。DHCP服务器位于AppSrv服务器上。拆分DHCP服务器上的作用域,拆分的百分比为7:3。InsideCli优先从RouterSrv获取地址。配置步骤 安装和配置DHCP relay服务,为办…

AIGC:阿里开源大模型通义千问部署与实战

1 引言 通义千问-7B&#xff08;Qwen-7B&#xff09;是阿里云研发的通义千问大模型系列的70亿参数规模的模型。Qwen-7B是基于Transformer的大语言模型, 在超大规模的预训练数据上进行训练得到。预训练数据类型多样&#xff0c;覆盖广泛&#xff0c;包括大量网络文本、专业书籍…