Spark编程实验三:Spark SQL编程

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Spark SQL基本操作

2、编程实现将RDD转换为DataFrame

3、编程实现利用DataFrame读写MySQL的数据

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

​​​​​​​[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":sc = SparkContext("local","Simple App")spark=SparkSession(sc)peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()rowRDD.createOrReplaceTempView("employee")personsDF = spark.sql("select * from employee")personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序: 

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

        Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
        在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。
        最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/223640.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【qt信号槽-5】信号槽相关注意事项记录

背景: 信号槽是qt很重要的概念,遇到问题帮助没少看。其中就有signals and slots这一章节,说得很到位。 概念琐碎,记录备忘。不对之处望指正。 【qt信号槽-1】槽函数重写问题,qt_metacall和qt_static_metacall-CSDN博…

百模大战中的AI行业:新趋势与未来发展

文章目录 每日一句正能量前言技术进步应用拓展行业变革人才竞争后记 每日一句正能量 人生最重要的价值是心灵的幸福,而不是任何身外之物。 前言 随着科技的迅猛发展,人工智能(AI)已经成为引领技术革命的重要驱动力之一。在当前的…

Jmeter、postman、python 三大主流技术如何操作数据库?

1、前言 只要是做测试工作的,必然会接触到数据库,数据库在工作中的主要应用场景包括但不限于以下: 功能测试中,涉及数据展示功能,需查库校验数据正确及完整性;例如商品搜索功能 自动化测试或性能测试中&a…

UnityHub无法打开项目问题,打开项目闪退回到hub界面

UnityHub无法打开项目问题,打开项目闪退回到hub界面 UnityHub启动项目闪烁unity界面之后立刻闪退到UnityHub界面情况一:这里这个问题我遇到了很多次情况都不太一样,我先说下我遇到的第一种问题也就是最好解决的一种。许可证到期导致闪退 情况…

怎么为pdf文件添加水印?

怎么为pdf文件添加水印?PDF是一种很好用的文件格式,这种格式能够很有效的保护我们的文件,但有时可能还会被破解,这种时候在PDF上添加水印就是比较好的方法。 综上所述,PDF是保密性很强的文件,但添加水印能够…

在Linux下探索MinIO存储服务如何远程上传文件

🌈个人主页:聆风吟 🔥系列专栏:网络奇遇记、Cpolar杂谈 🔖少年有梦不应止于心动,更要付诸行动。 文章目录 📋前言一. 创建Buckets和Access Keys二. Linux 安装Cpolar三. 创建连接MinIO服务公网地…

系列十一(面试)、如何查看JVM的参数?

一、查看JVM的参数 1.1、概述 上篇文章介绍了JVM的参数类型,通过jinfo可以查看JVM的默认参数,本章介绍另外一种查看JVM参数的方式。 1.2、 分类 JVM中提供了三种方式查看JVM的参数信息,这三种方式又分为两类,即:查看默…

04_线性表

线性表 顺序表顺序表的实现顺序表的遍历顺序表的容量可变顺序表的时间复杂度java中ArrayList实现 链表单向链表单向链表API设计java中LinkedList实现 链表的复杂度分析链表反转快慢指针中间值问题单向链表是否有环问题有环链表入口问题 循环链表约瑟夫问题 栈栈概述生活中的栈计…

HTTP前端请求

目录 HTTP 请求1.请求组成2.请求方式与数据格式get 请求示例post 请求示例json 请求示例multipart 请求示例数据格式小结 3.表单3.1.作用与语法3.2.常见的表单项 4.session 原理5.jwt 原理 HTTP 请求 1.请求组成 请求由三部分组成 请求行请求头请求体 可以用 telnet 程序测…

持续集成交付CICD:GitLabCI 封装Python类 并结合 ArgoCD 完成前端项目应用发布

目录 一、实验 1. 环境 2. Python代码实现获取文件 3.Python代码实现创建文件 4.Python代码实现更新文件 5.GitLab更新库文件与运行流水线 6.ArgoCD 完成前端项目应用发布 二、问题 1.Python获取GitLab指定仓库文件报错 2. K8S master节点运行Python代码报错 一、实验…

【网络编程】网络通信基础——简述TCP/IP协议

个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【网络编程】【Java系列】 本专栏旨在分享学习网络编程的一点学习心得,欢迎大家在评论区交流讨论💌 目录 一、ip地…

C/C++ 基础函数

memcpy:C/C语言中的一个用于内存复制的函数,声明在 string.h 中(C是 cstring) void *memcpy(void *destin, void *source, unsigned n);作用是:以source指向的地址为起点,将连续的n个字节数据,…

如何利用PPT绘图并导出清晰图片

在写论文的过程中,免不了需要绘图,但是visio等软件绘图没有在ppt上绘图比较熟练,尤其流程图结构图. 但是ppt导出的图片也不够清晰,默认分辨率是96dpi,而杂志投稿一般要求至300dpi。解决办法如下: 1.打开注…

通过 Higress Wasm 插件 3 倍性能实现 Spring-cloud-gateway 功能

作者:韦鑫,Higress Committer,来自南京航空航天大学分布式系统实验室 导读:本文将和大家一同回顾 Spring Cloud Gateway 是如何满足 HTTP 请求/响应转换需求场景的,并为大家介绍在这种场景下使用 Higress 云原生网关的…

【银行测试】相关专业知识点+核心业务系统性能方法(汇总)

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 1、银行测试相关专…

HarmonyOS4.0系统性深入开发03UIAbility组件详解(中)

UIAbility组件基本用法 UIAbility组件的基本用法包括:指定UIAbility的启动页面以及获取UIAbility的上下文UIAbilityContext。 指定UIAbility的启动页面 应用中的UIAbility在启动过程中,需要指定启动页面,否则应用启动后会因为没有默认加载…

Html / CSS刷题笔记

WebKit是一个开源的浏览器引擎,它最初是由苹果公司开发的,并且被广泛用于Safari浏览器和其他基于WebKit的浏览器,比如Google Chrome的早期版本。它也是构建许多移动设备浏览器的基础。WebKit的主要功能是解析HTML和CSS,并将其渲染…

爬虫工作量由小到大的思维转变---<第二十三章 Scrapy开始很快,越来越慢(医病篇)>

诊断篇https://blog.csdn.net/m0_56758840/article/details/135170994?ops_request_misc%257B%2522request%255Fid%2522%253A%2522170333243316800180644102%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id1703332433168001806441…

面试题:JVM 对锁都进行了哪些优化?

文章目录 锁优化自旋锁和自适应自旋锁消除锁粗化逃逸分析方法逃逸线程逃逸通过逃逸分析,编译器对代码的优化 锁优化 jvm 在加锁的过程中,会采用自旋、自适应、锁消除、锁粗化等优化手段来提升代码执行效率。 自旋锁和自适应自旋 现在大多的处理器都是…

node-red:使用node-red-contrib-amqp节点,实现与RabbitMQ服务器(AMQP)的消息传递

node-red-contrib-amqp节点使用 一、简介1.1 什么是AMQP协议?1.2 什么是RabbitMQ? -> 开源的AMQP协议实现1.3 RabbitMQ的WEB管理界面介绍1.3 如何实现RabbitMQ的数据采集? -> node-red 二、node-red-contrib-amqp节点安装与使用教程2.1 节点安装2.2 节点使用2.2.1 amq…