Spark SQL基础

SparkSQL基本介绍

什么是Spark SQL

Spark SQL是Spark多种组件中其中一个,主要是用于处理大规模的结构化数据

什么是结构化数据: 一份数据, 每一行都有固定的列, 每一列的类型都是一致的 我们将这样的数据称为结构化的数据
例如: mysql的表数据
1 张三 20
2 李四 15
3 王五 18
4 赵六 12

Spark SQL 的优势

1- Spark SQL 既可以编写SQL语句, 也可以编写代码, 甚至可以混合使用
2- Spark SQL 可以 和 HIVE进行集成, 集成后, 可以替换掉HIVE原有MR的执行引擎, 提升效率

Spark SQL特点:

1- 融合性: 既可以使用标准SQL语言, 也可以编写代码, 同时支持混合使用2- 统一的数据访问: 可以通过统一的API来对接不同的数据源3- HIVE的兼容性: Spark SQL可以和HIVE进行整合, 整合后替换执行引擎为Spark, 核心: 基于HIVE的metastore来处理4- 标准化连接: Spark SQL也是支持 JDBC/ODBC的连接方式

Spark SQL与HIVE异同

相同点:

1- 都是分布式SQL计算引擎
2- 都可以处理大规模的结构化数据
3- 都可以建立Yarn集群之上运行

不同点:

1- Spark SQL是基于内存计算, 而HIVE SQL是基于磁盘进行计算的
2- Spark SQL没有元数据管理服务(自己维护), 而HIVE SQL是有metastore的元数据管理服务的
3- Spark SQL底层执行Spark RDD程序, 而HIVE SQL底层执行是MapReduce
4- Spark SQL可以编写SQL也可以编写代码,但是HIVE SQL仅能编写SQL语句

Spark SQL的数据结构对比

在这里插入图片描述

说明:pandas的DataFrame: 二维表  处理单机结构数据Spark Core: 处理任何的数据结构   处理大规模的分布式数据Spark SQL: 二维表  处理大规模的分布式结构数据

在这里插入图片描述

RDD: 存储直接就是对象, 比如在图中, 存储就是一个Person的对象, 但是里面是什么数据内容, 不太清楚DataFrame: 将Person的中各个字段数据, 进行结构化存储, 形成一个DataFrame, 可以直接看到数据Dataset: 将Person对象中数据都按照结构化的方式存储好, 同时保留的对象的类型, 从而知道来源于一个Person对象由于Python不支持泛型, 所以无法使用Dataset类型, 客户端仅支持DataFrame类型

Spark SQL构建SparkSession对象

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 创建SparkSQL中的顶级对象SparkSession# alt+回车"""注意事项:1- SparkSession和builder都没有小括号2- appName():给应用程序取名词。等同于SparkCore中的setAppName()3- master():设置运行时集群类型。等同于SparkCore中的setMaster()"""spark = SparkSession.builder\.appName('create_sparksession_demo')\.master('local[*]')\.getOrCreate()# 通过SparkSQL的顶级对象获取SparkCore中的顶级对象sc = spark.sparkContext# 释放资源sc.stop()spark.stop()

DataFrame详解

DataFrame基本介绍

在这里插入图片描述

DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息表结构描述信息(元数据Schema): StructType对象
字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空
行: Row对象
列: Column对象,包含字段名称和字段值在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息

如何构建表结构信息数据:
在这里插入图片描述

DataFrame的构建方式

通过RDD得到一个DataFrame

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('rdd_2_dataframe')\.master('local[*]')\.getOrCreate()# 通过SparkSession得到SparkContextsc = spark.sparkContext# 2- 数据输入# 2.1- 创建一个RDDinit_rdd = sc.parallelize(["1,李白,20","2,安其拉,18"])# 2.2- 将RDD的数据结构转换成二维结构new_rdd = init_rdd.map(lambda line: (int(line.split(",")[0]),line.split(",")[1],int(line.split(",")[2])))# 将RDD转成DataFrame:方式一# schema方式一schema = StructType()\.add('id',IntegerType(),False)\.add('name',StringType(),False)\.add('age',IntegerType(),False)# schema方式二schema = StructType([StructField('id',IntegerType(),False),StructField('name',StringType(),False),StructField('age',IntegerType(),False)])# schema方式三schema = "id:int,name:string,age:int"# schema方式四schema = ["id","name","age"]init_df = spark.createDataFrame(data=new_rdd,schema=schema)# 将RDD转成DataFrame:方式二"""toDF:中的schema既可以传List,也可以传字符串形式的schema信息"""# init_df = new_rdd.toDF(schema=["id","name","age"])init_df = new_rdd.toDF(schema="id:int,name:string,age:int")# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源sc.stop()spark.stop()

运行结果截图:
在这里插入图片描述
场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么我可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。

内部初始化数据得到DataFrame

from pyspark import SparkConf, SparkContext
import os# 绑定指定的Python解释器
from pyspark.sql import SparkSessionos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("内部初始化数据得到DataFrame。类似SparkCore中的parallelize")# 1- 创建SparkSession顶级对象spark = SparkSession.builder\.appName('inner_create_dataframe')\.master('local[*]')\.getOrCreate()# 2- 数据输入"""通过createDataFrame创建DataFrame,schema数据类型可以是:DataType、字符串、List字符串:格式要求格式一 字段1 字段类型,字段2 字段类型格式二(推荐) 字段1:字段类型,字段2:字段类型List:格式要求["字段1","字段2"]"""# 内部初始化数据得到DataFrameinit_df = spark.createDataFrame(data=[(1,'张三',18),(2,'李四',30)],schema="id:int,name:string,age:int")# init_df = spark.createDataFrame(#     data=[(1, '张三', 18), (2, '李四', 30)],#     schema="id int,name string,age int"# )# init_df = spark.createDataFrame(#     data=[(1, '张三', 18), (2, '李四', 30)],#     schema=["id","name","age"]# )# init_df = spark.createDataFrame(#     data=[(1, '张三', 18), (2, '李四', 30)],#     schema=["id:int", "name:string", "age:int"]# )# 3- 数据处理# 4- 数据输出# 输出dataframe的数据内容init_df.show()# 输出dataframe的schema信息init_df.printSchema()# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述
场景:一般用在开发和测试中。因为只能处理少量的数据
Schema总结
通过createDataFrame创建DataFrame,schema数据类型可以是:DataType、字符串、List
1: 字符串
格式一 字段1 字段类型,字段2 字段类型
格式二(推荐) 字段1:字段类型,字段2:字段类型

2: List
[“字段1”,“字段2”]

3: DataType(推荐,用的最多)
格式一 schema = StructType()
.add(‘id’,IntegerType(),False)
.add(‘name’,StringType(),True)
.add(‘age’,IntegerType(),False)

格式二 schema = StructType([
StructField(‘id’,IntegerType(),False),
StructField(‘name’,StringType(),True),
StructField(‘age’,IntegerType(),False)
])

读取外部文件

复杂API

统一API格式: 
sparksession.read.format('text|csv|json|parquet|orc|avro|jdbc|.....') # 读取外部文件的方式.option('k','v') # 选项  可以设置相关的参数 (可选).schema(StructType | String) #  设置表的结构信息.load('加载数据路径') # 读取外部文件的路径, 支持 HDFS 也支持本地

简写API

格式: spark.read.读取方式()例如: df = spark.read.csv(path='file:///export/data/spark_sql/data/stu.txt',header=True,sep=' ',inferSchema=True,encoding='utf-8',)
Text方式读取
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("text方式读取文件")# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('text_demo')\.master('local[*]')\.getOrCreate()# 2- 数据输入"""load:支持读取HDFS文件系统和本地文件系统HDFS文件系统:hdfs://node1:8020/文件路径本地文件系统:file:///文件路径text方式读取文件总结:1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理2- 默认生成的列名叫value,数据类型string3- 我们只能够在schema中修改字段value的名称,其他任何内容不能修改"""init_df = spark.read\.format('text')\.schema("my_field string")\.load('file:///export/data/stu.txt')# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述
text方式读取文件总结:
1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理
2- 默认生成的列名叫value,数据类型string
3- 我们只能够在schema中修改字段value的名称,其他任何内容不能修改

CSV方式读取
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("csv方式读取文件")# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('csv_demo')\.master('local[*]')\.getOrCreate()# 2- 数据输入"""csv格式读取外部文件总结:1- 复杂API和简写API都必须掌握2- 相关参数作用说明:2.1- path:指定读取的文件路径。支持HDFS和本地文件路径2.2- schema:手动指定元数据信息2.3- sep:指定字段间的分隔符2.4- encoding:指定文件的编码方式2.5- header:指定文件中的第一行是否是字段名称2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确"""# 复杂API写法init_df = spark.read\.format('csv')\.schema("id int,name string,address string,sex string,age int")\.option("sep"," ")\.option("encoding","UTF-8")\.option("header","True")\.load('file:///export/data/stu.txt')# 简写API写法# init_df = spark.read.csv(#     path='file:///export/data/gz16_pyspark/02_spark_sql/data/stu.txt',#     schema="id int,name string,address string,sex string,age int",#     sep=' ',#     encoding='UTF-8',#     header="True"# )# init_df = spark.read.csv(#     path='file:///export/data/stu.txt',#     sep=' ',#     encoding='UTF-8',#     header="True",#     inferSchema=True# )# 3- 数据处理# 4- 数据输出init_df.show()init_df.printSchema()# 5- 释放资源spark.stop()

csv格式读取外部文件总结:
1- 复杂API和简写API都必须掌握
2- 相关参数作用说明:
2.1- path:指定读取的文件路径。支持HDFS和本地文件路径
2.2- schema:手动指定元数据信息
2.3- sep:指定字段间的分隔符
2.4- encoding:指定文件的编码方式
2.5- header:指定文件中的第一行是否是字段名称
2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确

JSON方式读取

json的数据内容:

{'id': 1,'name': '张三','age': 20}
{'id': 2,'name': '李四','age': 23,'address': '北京'}
{'id': 3,'name': '王五','age': 25}
{'id': 4,'name': '赵六','age': 29}

代码实现

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象spark = SparkSession.builder\.appName('json_demo')\.master('local[*]')\.getOrCreate()# 2- 数据输入"""json读取数据总结:1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔"""# init_df = spark.read.json(#     path='file:///export/data.txt',#     schema="id2 int,name string,age int,address string",#     encoding='UTF-8'# )# init_df = spark.read.json(#     path='file:///export/data.txt',#     schema="id:int,name:string,age:int,address:string",#     encoding='UTF-8'# )init_df = spark.read.json(path='file:///export/data.txt',schema="id int,name string,age int,address string",encoding='UTF-8')# 3- 数据输出init_df.show()init_df.printSchema()# 4- 释放资源spark.stop()

运行结果截图:
在这里插入图片描述
json读取数据总结:
1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充
2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔

DataFrame的相关API

操作DataFrame一般有二种操作方案:一种为【DSL方式】,另一种为【SQL方式】

SQL方式: 通过编写SQL语句完成统计分析操作
DSL方式: 特定领域语言,使用DataFrame特有的API完成计算操作,也就是代码形式从使用角度来说: SQL可能更加的方便一些,当适应了DSL写法后,你会发现DSL要比SQL更好用
从Spark角度来说: 更推荐使用DSL方案,此种方案更加利于Spark底层的优化处理

SQL相关的API

  • 创建一个视图/表
df.createTempView('视图名称'): 创建一个临时的视图(表名)
df.createOrReplaceTempView('视图名称'): 创建一个临时的视图(表名),如果视图存在,直接替换
临时视图,仅能在当前这个Spark Session的会话中使用df.createGlobalTempView('视图名称'): 创建一个全局视图,运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用
  • 执行SQL语句
spark.sql('书写SQL')

DSL相关的API

  • show():用于展示DF中数据, 默认仅展示前20行
    • 参数1:设置默认展示多少行 默认为20
    • 参数2:是否为阶段列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置)
  • printSchema():用于打印当前这个DF的表结构信息
  • select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样
  • filter()和 where():用于对数据进行过滤操作, 一般在spark SQL中主要使用where
  • groupBy():用于执行分组操作
  • orderBy():用于执行排序操作
DSL主要支持以下几种传递的方式:  str | Column对象 | 列表str格式:  '字段'Column对象:  DataFrame含有的字段  df['字段']执行过程新产生:  F.col('字段')列表: ['字段1','字段2'...][df['字段1'],df['字段2']]

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可

导入这个函数库: import pyspark.sql.functions as F
通过F调用对应的函数即可。SparkSQL中所支持的函数,都可以通过以下地址查询到: https://spark.apache.org/docs/3.1.2/api/sql/index.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/240245.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【揭秘AI】穿越时光隧道,探秘AI起源与发展01

算盘 被誉为世界上最古老的计算机之一,是一种手动操作的计算工具,起源于中国。它主要由框、梁和珠子组成,通过移动珠子在档位上的位置来进行加减乘除运算。算盘的发明时间可以追溯到公元前或公元初期,据历史记载,东汉…

Angular系列教程之MVC模式和MVVM模式

文章目录 MVC模式MVVM模式MVC与MVVM的区别Angular如何实现MVVM模式总结 在讨论Angular的时候,我们经常会听到MVC和MVVM这两种设计模式。这两种模式都是为了将用户界面(UI)和业务逻辑分离,使得代码更易于维护和扩展。在这篇文章中,我们将详细介…

游戏素材永不缺,免费在线AI工具Scenario功能齐全,简单易用

Scenario是一个在线的AI驱动的工具,主要用于游戏艺术创作。它提供了一套全面的功能,旨在帮助游戏开发者创建与其独特风格和艺术方向相符的独特、高质量的游戏艺术。Scenario的突出特点之一是它的微调能力,允许用户根据独特的风格和艺术方向训…

uniapp 编译后文字乱码的解决方案

问题: 新建的页面中编写代码,其中数字和图片都可以正常显示,只有中文编译后展示乱码 页面展示也是乱码 解决方案: 打开HuilderX编辑器的【文件】- 【以指定编码重新打开】- 【选择UTF-8】 然后重新编译就可以啦~ 希望可以帮到你啊~

如何利用小程序介绍公司品牌形象?

企业小程序的建设对于现代企业来说已经成为了一项必不可少的工作。随着移动互联网的快速发展,越来越多的职场人士和创业老板希望通过小程序来提升企业形象,增强与用户的互动,实现更好的商业效果。在这个过程中,使用第三方制作平台…

CMU15-445-Spring-2023-Project #3 - 前置知识(lec10-14)

Lecture #10_ Sorting & Aggregation Algorithms Query Plan 数据库系统会将 SQL 编译成查询计划。查询计划是一棵运算符树。 Sorting DBMS 需要对数据进行排序,因为根据关系模型,表中的tuple没有特定的顺序。排序使用 ORDER BY、GROUP BY、JOIN…

leedcode刷题笔记day1

题目大意: 暴力解法 两个for循环(也是我一看到题目想到的方法) 枚举在数组中所有的不同的两个下标的组合逐个检查它们所对应的数的和是否等于 target 复杂度分析 时间复杂度:O(n2),这里 n 为数组的长度 空间复杂度:O(1)&#x…

Python进程池multiprocessing.Pool

环境: 鲲鹏920:192核心 内存:756G python:3.9 python单进程的耗时 在做单纯的cpu计算的场景,使用单进程核多进程的耗时做如下测试: 单进程情况下cpu的占用了如下,占用一半的核心数: 每一步…

IOS-高德地图路径绘制-Swift

本文展示的是在IOS开发中调用高德地图进行驾车路径绘制,开发语言是Swift。 IOS高德地图集成请看:IOS集成高德地图Api 使用路径规划功能需要集成高德地图的搜索功能。 pod AMapSearch定义AMapSearchAPI 定义主搜索对象 AMapSearchAPI ,并继承…

Python爬虫实战:IP代理池助你突破限制,高效采集数据

当今互联网环境中,为了应对反爬虫、匿名访问或绕过某些地域限制等需求,IP代理池成为了一种常用的解决方案。IP代理池是一个包含多个可用代理IP地址的集合,可以通过该代理池随机选择可用IP地址来进行网络请求。 IP代理池是一组可用的代理IP地址…

旧路由重置新路由设置新路由设置教程|适用于PPPoE拨号

前言 前几天朋友说路由器想要重置,但不知道怎么弄。所以就想着只帮忙重置路由器的话,只能帮到一个人。但把整个过程写成图文,就可以帮助更多人。 本文章适合电脑小白,请注意每一步哦! 注意事项 开始之前需要确认光猫…

FastAdmin上传图片服务端压缩图片,实测13.45M压缩为29.91K

先前条件:第一步安装compose,已安装忽略。 先上截图看效果 一、在fastadmin的根目录里面输入命令安装think-image composer require topthink/think-image二、找到公共上传类,application/common/library/Upload.php,在最下面…

【问题记录】使用命令语句从kaggle中下载数据集

从Kaggle中下载Tusimple数据集 1.服务器环境中安装kaggle 使用命令:pip install kaggle 2.复制下载API 具体命令如下: kaggle datasets download -d manideep1108/tusimple3.配置kaggle.json文件 如果直接使用命令会报错: root:~# kagg…

Java重修第十天—代码进阶

第十天代码进阶&#xff0c;完成以下四个题目&#xff0c;提高编程能力。 第一题 代码实现 package cn.msf.baseJava.d_14;import java.util.*;public class Test1 {public static void main(String[] args) {Random r new Random();ArrayList<Integer> p new ArrayL…

transbigdata笔记:其他方法

1 出租车相关 1.1 taxigps_to_od 提取出租车OD信息 transbigdata.taxigps_to_od(data, col[VehicleNum, Stime, Lng, Lat, OpenStatus]) 输入出租车GPS数据&#xff0c;提取OD信息 data出租车GPS数据col[VehicleNum, Time, Lng, Lat, OpenStatus]五列 比如GPS数据长这样&am…

利用Wireshark分析IP协议

实验.利用Wireshark分析IP协议 一&#xff0e;实验目的 1.掌握Wireshark软件简单的过滤语法 2.掌握IP数据报的组成格式 3.掌握IP分片的计算方法 4.学会利用Wireshark抓包分析IP协议 二&#xff0e;实验环境 1.Wireshark软件 2.Windows 计算机 三&#xff0e;实验预备知识 1.IP…

【Qt】Qt配置

需要云服务器等云产品来学习Linux的同学可以移步/-->腾讯云<--/-->阿里云<--/-->华为云<--/官网&#xff0c;轻量型云服务器低至112元/年&#xff0c;新用户首次下单享超低折扣。 目录 一、Qt SDK下载 二、配置环境变量 三、新建工程(QWidget) 四、QWidg…

【小白专用】C# 连接 MySQL 数据库

C# – Mysql 数据库连接 1. 配置环境 #前提&#xff1a;电脑已安装Mysql服务&#xff1b; Visual Studio 安装Mysql依赖库&#xff1a; 工具 -> NuGet 包管理器 -> 管理解决方案的 NuGet程序包 —> 搜索&#xff0c; 安装Mysql.Data (Oracle); (安装成功后&…

ASP.NET Core 的 Web Api 实现限流 中间件

Microsoft.AspNetCore.RateLimiting 中间件提供速率限制&#xff08;限流&#xff09;中间件。 它是.NET 7 以上版本才支持的中间件&#xff0c;刚看了一下&#xff0c;确实挺好用&#xff0c;下面给大家简单介绍一下&#xff1a; RateLimiterOptionsExtensions 类提供下列用…

【AI视野·今日Robot 机器人论文速览 第七十三期】Tue, 9 Jan 2024

AI视野今日CS.Robotics 机器人学论文速览 Tue, 9 Jan 2024 Totally 40 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers Digital Twin for Autonomous Surface Vessels for Safe Maritime Navigation Authors Daniel Menges, Andreas Von Brandis, A…