Spark SQL概述与基本操作

目录

一、Spark SQL概述

        (1)概念

        (2)特点

        (3)Spark SQL与Hive异同

        (4)Spark的数据抽象

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        (2)代码演示

三、DataFrame创建

        (1)DataFrame组成

        (2)DataFrame创建方式(转换)

        (3)DataFrame创建方式(标准API读取)

四、DataFrame编程

        (1)DSL语法风格

        (2)SQL语法风格

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        (2)代码示例


一、Spark SQL概述

        (1)概念

        Spark SQL是Apache Spark的一个模块,它用于处理结构化和半结构化的数据。Spark SQL允许用户使用SQL查询和操作数据,这种操作可以直接在Spark的DataFrame/Dataset API中进行。此外,Spark SQL还支持多种语言,包括Scala、Java、Python和R。

        (2)特点

        ①融合性:SQL可以无缝集成在代码中,随时用SQL处理数据。

        ②统一数据访问:一套标准API可读写不同的数据源。

        ③Hive兼容:可以使用Spark SQL直接计算生成Hive数据表。

        ④标准化连接:支持标准化JDBC \ ODBC连接,方便和各种数据库进行数据交互。

        (3)Spark SQL与Hive异同

        共同点:Hive和Spark均是:“分布式SQL计算引擎”,均是构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能。

        (4)Spark的数据抽象

        Spark SQL的数据抽象:

        Data Frame与RDD:

二、Spark Session对象执行环境构建

          (1)Spark Session对象

        在RDD阶段,程序的执行入口对象是:SparkContext。在Spark 2.0后,推出了SparkSession对象,作为Spark编码的统一入口对象。

        Spark Session对象作用:

        ①用于SparkSQL编程作为入口对象。

        ②用于SparkCore编程,可以通过Spark Session对象中获取到Spark Context。

        (2)代码演示
# cording:utf8# Spark Session对象的导包,对象是来自于pyspark.sql包中
from pyspark.sql import SparkSession
if __name__ == '__main__':# 构建Spark Session执行环境入口对象spark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 通过Spark Session对象 获取SparkContext对象sc = spark.sparkContext# SparkSQL测试df = spark.read.csv('../input/stu_score.txt', sep=',', header=False)df2 = df.toDF('id', 'name', 'score')# 打印表结构# df2.printSchema()# 打印数据内容# df2.show()df2.createTempView('score')# SQL风格spark.sql("""SELECT * FROM score WHERE name='语文' LIMIT 5""").show()# DSL 风格df2.where("name='语文'").limit(5).show()

三、DataFrame创建

        (1)DataFrame组成

        DataFrame是一个二维表结构,表格结构的组成:

                ①行

                ②列

                ③表结构描述

        比如,在MySQL中的一个表:

                ①有许多列组成

                ②数据也被分为多个列

                ③表也有表结构信息(列、列名、列类型、列约束等)

        基于这个前提下,DataFrame的组成如下:

                在结构层面:

                        ①StructType对象描述整个DataFrame的表结构

                        ②StructField对象描述一个列的信息

                在数据层面:

                        ①Row对象记录一行数据

                        ②Column对象记录一列数据并包含列的信息

        (2)DataFrame创建方式(转换)

        ①基于RDD方式

# cording:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 构建执行环境对象Spark Sessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 构建SparkContextsc = spark.sparkContext# 基于RDD转换为DataFramerdd = sc.textFile('../input/people.txt').\map(lambda x: x.split(',')).\map(lambda x: (x[0], int(x[1])))# 构建DataFrame对象# 参数1,被转换的RDD# 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可df = spark.createDataFrame(rdd,schema=['name', 'age'])# 打印Data Frame的表结构df.printSchema()# 打印df中的数据# 参数1,表示 展示出多少条数据,默认不传的话是20# 参数2,表示是否对列进行截断,如果列的数据长度超过20个字符串长度,厚旬欸日不显示,以....代替# 如果给False 表示不截断全部显示,默认是Truedf.show(20,False)# 将DF对象转换成临时视图表,可供sql语句查询df.createOrReplaceTempView('people')spark.sql('SELECT * FROM people WHERE age < 30').show()

        ②通过StructType对象来定义DataFrame的 ‘ 表结构 ’ 转换RDD

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':# 构建执行环境对象Spark Sessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 构建SparkContextsc = spark.sparkContext# 基于RDD转换为DataFramerdd = sc.textFile('../input/people.txt').\map(lambda x: x.split(',')).\map(lambda x: (x[0], int(x[1])))# 构建表结构的描述对象:StructType 对象# 参数1,列名# 参数2,列数据类型# 参数3,是否允许为空schema = StructType().add('name', StringType(), nullable=True).\add('age', IntegerType(), nullable=False)# 构建DataFrame对象# 参数1,被转换的RDD# 参数2,指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可df = spark.createDataFrame(rdd, schema=schema)df.printSchema()df.show()

        ③通过RDD的toDF方法创建RDD

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
if __name__ == '__main__':# 构建执行环境对象Spark Sessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()# 构建SparkContextsc = spark.sparkContext# 基于RDD转换为DataFramerdd = sc.textFile('../input/people.txt').\map(lambda x: x.split(',')).\map(lambda x: (x[0], int(x[1])))# toDF构建DataFrame# 第一种构建方式,只能设置列名,列类型靠RDD推断,默认允许为空df1 = rdd.toDF(['name', 'name'])df1.printSchema()df1.show()# toDF方式2:通过StructType来构造# 设置全面,能设置列名、列数据类型、是否为空# 构建表结构的描述对象:StructType 对象# 参数1,列名# 参数2,列数据类型# 参数3,是否允许为空schema = StructType().add('name', StringType(), nullable=True).\add('age', IntegerType(), nullable=False)df2 = rdd.toDF(schema=schema)df2.printSchema()df2.show()

        ④基于Pandas的DataFrame创建DataFrame

# cording:utf8from pyspark.sql import SparkSession
import pandas as pdif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 基于pandas的DataFrame构建SparkSQL的DataFrame对象pdf = pd.DataFrame({'id': [1, 2, 3],'name': ['张大仙', '王晓晓', '吕不韦'],'age': [1, 2, 3]})df = spark.createDataFrame(pdf)df.printSchema()df.show()

        (3)DataFrame创建方式(标准API读取)

        统一API示例代码:

        ①读取本地text文件

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 构建StructType,text数据源,# text读取数据的特点是:将一整行只作为一个列读取,默认列名是value 类型是Stringschema = StructType().add('data', StringType(),nullable=True)df = spark.read.format('text').\schema(schema=schema).\load('../input/people.txt')df.printSchema()df.show()

        ②读取json文件

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# json文件类型自带Schema信息df = spark.read.format('json').load('../input/people.json')df.printSchema()df.show()

        ③读取csv文件

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 读取csv文件df = spark.read.format('csv').\option('sep', ';').\option('header', True).\option('encoding', 'utf-8').\schema('name STRING, age INT, job STRING').\load('../input/people.csv')df.printSchema()df.show()

        ④读取parquet文件

        parquet文件:是Spark中常用的一种列式存储文件格式,和Hive中的ORC差不多,他们都是列存储格式。

        parquet对比普通的文本文件的区别:

                ①parquet内置schema(列名、列类型、是否为空)

                ②存储是以列作为存储格式

                ③存储是序列化存储在文件中的(有压缩属性体积小)

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 读取parquet文件df = spark.read.format('parquet').load('../input/users.parquet')df.printSchema()df.show()

四、DataFrame编程

        (1)DSL语法风格
# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContextdf = spark.read.format('csv').\schema('id INT, subject STRING, score INT').\load('../input/stu_score.txt')# Column对象的获取id_column = df['id']subject_column = df['subject']# DLS风格df.select(['id', 'subject']).show()df.select('id', 'subject').show()df.select(id_column, subject_column).show()# filter APIdf.filter('score < 99').show()df.filter(df['score'] < 99).show()# where APIdf.where('score < 99').show()df.where(df['score'] < 99).show()# group By API# df.groupBy API的返回值为 GroupedData类型1# GroupedData对象不是DataFrame# 它是一个 有分组关系的数据结构,有一些API供我们对分组做聚合# SQL:group by 后接上聚合: sum avg count min max# GroupedData 类似于SQL分组后的数据结构,同样由上述5中聚合方法# GroupedData 调用聚合方法后,返回值依旧是DayaFrame# GroupedData 只是一个中转的对象,最终还是会获得DataFrame的结果df.groupBy('subject').count().show()df.groupBy(df['subject']).count().show()
        (2)SQL语法风格

        DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql()来执行SQL语句查询,结果返回一个DataFrame。
        如果想使用SQL风格的语法,需要将DataFrame注册成表,采用如下的方式:        

df.createTempView( "score")            #注册一个临时视图(表)
df.create0rReplaceTempView("score")    #注册一个临时表,如果存在进行替换。
df.createGlobalTempView( "score")      #注册一个全局表

        全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询前带上前缀:
        global_temp.
        临时表:只在当前SparkSession中可用

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringTypeif __name__ == '__main__':# 构建执行环境入口对象SparkSessionspark = SparkSession.builder.\appName('test').\master('local[*]').\getOrCreate()sc = spark.sparkContextdf = spark.read.format('csv').\schema('id INT, subject STRING, score INT').\load('../input/stu_score.txt')# 注册成临时表df.createTempView('score')              # 注册临时视图(表)df.createOrReplaceTempView('score_2')   # 注册或者替换为临时视图df.createGlobalTempView('score_3')      # 注册全局临时视图 全局临时视图使用的时候 需要在前面带上global_temp. 前缀# 可以通过SparkSession对象的sql api来完成sql语句的执行spark.sql("SELECT subject, COUNT(*) AS cnt FROM score GROUP BY subject").show()spark.sql("SELECT subject, COUNT(*) AS cnt FROM score_2 GROUP BY subject").show()spark.sql("SELECT subject, COUNT(*) AS cnt FROM global_temp.score_3 GROUP BY subject").show()

五、Spark SQL——wordcount代码示例

        (1)pyspark.sql.functions包

        这个包里面提供了一系列的计算函数供SparkSQL使用

        导包:from pyspark.sql import functions as F

        这些函数返回值多数都是Column对象。

        (2)代码示例
# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql import functions as Fif __name__ == '__main__':spark = SparkSession.builder.appName('wordcount').master('local[*]').getOrCreate()sc = spark.sparkContext# TODO 1:SQL风格进行处理rdd = sc.textFile('../input/words.txt').\flatMap(lambda x: x.split(' ')).\map(lambda x: [x])df = rdd.toDF(['word'])# 注册DF为表格df.createTempView('words')spark.sql('SELECT word,COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC').show()# TODO 2:DSL 风格处理df = spark.read.format('text').load('../input/words.txt')# withColumn 方法# 方法功能:对已存在的列进行操作,返回一个新的列,如果名字和老列相同,那么替换,否则作为新列存在df2 = df.withColumn('value', F.explode(F.split(df['value'], ' ')))df2.groupBy('value').\count().\withColumnRenamed('value', 'word').\withColumnRenamed('count', 'cnt').\orderBy('cnt', ascending=False).show()# withColumnRenamed() 对列名进行重命名# orderBy() 排序

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/170816.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频相关学习笔记

YUV 和rgb一样是一种表示色彩的格式&#xff0c;Y表示亮度&#xff0c;UV表示色度&#xff08;U是蓝色投影&#xff0c;V是红色投影&#xff09;&#xff0c;只有Y就是黑白的&#xff0c;所以这个格式的视频图片可以兼容黑白电视&#xff0c;所以彩色电视使用的都是YUV 存储方…

JS清除字符串中的空格

一、replace()方法 replace方法在字符串中搜索值或正则表达式&#xff0c;返回已替换值的新字符串&#xff0c;不会更改原始字符串。 去除字符串内所有的空格&#xff1a;str str.replace(/\s*/g,“”) 去除字符串内两头的空格&#xff1a;str str.replace(/^\s*|\s*$/g,“…

PDF编辑工具Acrobat Pro DC 2023中文

Acrobat Pro DC 2023是一款全面、高效的PDF编辑和管理软件。它提供了丰富的PDF编辑功能&#xff0c;如创建、编辑、合并、分割、压缩、旋转、裁剪等&#xff0c;让用户可以轻松处理各种PDF文档。同时&#xff0c;该软件还具有智能的PDF处理技术&#xff0c;可以自动识别和修复P…

Autojs 利用OpenCV识别棋子之天天象棋你马没了

本例子通过代码像你介绍利用OpenCV实现霍尔找圆的方法定位棋子位置 通过autojs脚本实现自动点击棋子 开源地址 https://github.com/Liberations/TtxqYourHorseIsGone/blob/master/main.js AutoXJs https://github.com/kkevsekk1/AutoX/releasesauto() //安卓版本高于Android 9…

【网络协议】聊聊TCP的三挥四握

上一篇我们说了网络其实是不稳定的&#xff0c;TCP和UDP其实是两个不同的对立者&#xff0c;所以TCP为了保证数据在网络中传输的可靠性&#xff0c;从丢包、乱序、重传、拥塞等场景有自己的一套打法。 TCP格式 源端口和目标端口是不可缺少的&#xff0c;用以区分到达发送给拿…

软件测试行情不好,我还是啃下了27K的offer

o “会代码吗&#xff1f;” o “会&#xff0c;Java、Python我都会一些&#xff01;” o “有没有用代码开发过一些测试工具平台呢&#xff1f;” o “额。。。这个。。。没做过。。。” o “那你回去等消息吧” 软件测试行业发展到今天&#xff0c;测试人员会代码&#x…

牛客网刷题-(6)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

Android [SPI,AutoSerivce,ServiceLoader]

记录一下在Android中使用SPI的过程。 1.项目gralde文件。 plugins {id kotlin-kapt } dependencies {implementation com.google.auto.service:auto-service:1.0-rc7 kapt "com.google.auto.service:auto-service:1.0-rc7" } 这个AutoServ…

linux下安装 Chrome 和 chromedriver 以及 selenium webdriver 使用

1 安装 Chrome yum install https://dl.google.com/linux/direct/google-chrome-stable_current_x86_64.rpm2 下载 chromedriver # 进入下载目录 cd soft/crawler_tools# 查看chrome 版本号 google-chrome --version# 在chromedriver下载地址中找到对应版本&#xff0c;下载对…

电脑视频怎么转音频mp3

如果你在电脑上观看视频时喜欢上某个片段的背景音乐&#xff0c;且想将喜欢的背景音乐制作为手机铃声。我是建议你将此视频转换为 MP3 格式&#xff0c;因为 MP3 几乎与所有设备相兼容&#xff0c;让你可以在不同设备上不受限制地去聆听它。那该如何转换呢&#xff1f;无需担心…

目标跟踪ZoomTrack: Target-aware Non-uniform Resizing for Efficient Visual Tracking

论文作者&#xff1a;Yutong Kou,Jin Gao,Bing Li,Gang Wang,Weiming Hu,Yizheng Wang,Liang Li 作者单位&#xff1a;CASIA; University of Chinese Academy of Sciences; ShanghaiTech University; Beijing Institute of Basic Medical Sciences; People AI, Inc 论文链接&…

2023年腾讯云2核4G配置服务器性价比怎么样?

2023年腾讯云2核4G配置服务器性价比怎么样?性价比高&#xff01;CPU具有100%计算性能&#xff0c;而且双11优惠价一年166元&#xff0c;三年566元&#xff0c;性价比超级高&#xff01; 2023腾讯云双11优惠活动&#xff1a;轻量2核4G5M服务器166.6元/年&#xff0c;3年566.6元…

达梦:开启sql日志记录

前言 开启sql日志记录&#xff0c;可协助排查定位数据库问题。生产开启会有一定的性能消耗&#xff0c;建议打开 SQL 日志异步刷盘功能 1.配置sqllog.ini文件 sqllog.ini 用于 SQL 日志的配置&#xff0c;当且仅当 INI 参数 SVR_LOG1 时使用。 运行中的数据库实例&#xff0c;可…

GoLong的学习之路(三)语法之运算符

书接上回&#xff0c;我们进展到了GoLong的基本数据类型&#xff0c;接下来说运算符&#xff08;其实和常见的编程语言的逻辑规则一样&#xff09; 运算符 运算符用于在程序运行时执行数学或逻辑运算。&#xff08;不可谓不重要&#xff09; Go 语言内置的运算符有&#xff…

如何做好建筑翻译呢

近年来&#xff0c;随着跨国工程项目增加&#xff0c;建筑翻译也越来越受到重视。尤其是建筑图纸翻译在工程设计、规划和施工等方面都具有重要意义。那么&#xff0c;如何做好建筑翻译呢&#xff0c;建筑工程翻译哪个比较正规&#xff1f; 在建筑行业日新月异的发展中&#xff…

leetcode第80题:删除有序数组中的重复项 II

题目描述 给你一个有序数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使得出现次数超过两次的元素只出现两次 &#xff0c;返回删除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须在 原地 修改输入数组 并在使用 O(1) 额外空间的条件下完成。 …

orb-slam3编译手册(Ubuntu20.04)

orb-slam3编译手册&#xff08;Ubuntu20.04&#xff09; 一、环境要求1.安装git2.安装g3.安装CMake4.安装vi编辑器 二、源代码下载三、依赖库下载1.Eigen安装2.Pangolin安装3.opencv安装4.安装Python & libssl-dev5.安装boost库 三、安装orb-slam3四、数据集下载及测试 写在…

javascript原生态xhr上传多个图片,可预览和修改上传图片为固定尺寸比例,防恶意代码,加后端php处理图片

//前端上传文件 <!DOCTYPE html> <html xmlns"http://www.w3.org/1999/xhtml" lang"UTF-8"></html> <html><head><meta http-equiv"Content-Type" content"text/html;charsetUTF-8;"/><title…

权限系统设计(转载)

1 为什么需要权限管理 2 权限模型 2.1 权限设计 2.2 为什么需要角色 2.3 权限模型的演进 2.4 用户划分 2.5 理想的RBAC模型 3 权限系统表设计 3.1 标准RBAC模型表设计 3.2 理想RBAC模型表设计 4 结语 1 为什么需要权限管理 日常工作中权限的问题时时刻刻伴随着我们&a…

出差学小白知识No5:|Ubuntu上关联GitLab账号并下载项目(ssh key配置)

1 注冊自己的gitlab账户 有手就行 2 ubuntu安装git &#xff0c;并查看版本 sudo apt-get install git git --version 3 vim ~/.ssh/config Host gitlab.example.com User your_username Port 22 IdentityFile ~/.ssh/id_rsa PreferredAuthentications publickey 替换gitl…