大数据分析与应用实验任务十

大数据分析与应用实验任务十

实验目的:

  • 通过实验掌握spark SQL的基本编程方法;

  • 熟悉RDD到DataFrame的转化方法;

  • 通过实验熟悉spark SQL管理不同数据源的方法。

实验任务:

进入pyspark实验环境,在桌面环境打开jupyter notebook,或者打开命令行窗口,输入pyspark,完成下列任务:

实验一、参考教材5.3-5.6节各个例程编写代码,逐行理解并运行。
1. DataFrame 的创建

在编写独立应用程序时,可以通过如下语句创建一个 SparkSession 对象:

from pyspark import SparkContext,SparkConf 
from pyspark.sql import SparkSession 
sparklzy = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

读取在“/usr/local/spark/examples/src/main/resources/”目录下的样例数据 people.json

dfluozhongye = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
dfluozhongye.show()

image-20231130112410232

2. DataFrame 的保存
peopleDFlzy = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json") peopleDFlzy.select("name", "age").write.format("json").save("file:///root/Desktop/luozhongye/newpeople.json")peopleDFlzy.select("name").write.format("text").save("file:///root/Desktop/luozhongye/newpeople.txt")

image-20231130112652757

如果要再次读取 newpeople.json 中的数据生成 DataFrame,可以直接使用 newpeople.json 目录名称,而不需要使用 part-00000-3db90180-ec7c-4291-ad05-df8e45c77f4d.json 文件(当然,使用这个文件也可以),代码如下:

peopleDFlzy = spark.read.format("json").load("file:///root/Desktop/luozhongye/newpeople.json") 
peopleDFlzy.show()

image-20231130112748692

3. DataFrame 的常用操作

创建好DataFrame以后,可以执行一些常用的DataFrame操作,包括printSchema()、select()、filter()、groupBy()和 sort()等。在执行这些操作之前,先创建一个 DataFrame:

dflzy=spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
(1) printSchema()

可以使用 printSchema()操作打印出 DataFrame 的模式(Schema)信息

dflzy.printSchema()

image-20231130112956579

(2) select()

select()操作的功能是从 DataFrame 中选取部分列的数据。

# select()操作选取了 name和 age 这两个列,并且把 age 这个列的值增加 1。
dflzy.select(dflzy['name'],dflzy['age']+1,).show()

image-20231130113032027

(3) filter()

filter()操作可以实现条件查询,找到满足条件要求的记录。

# 用于查询所有 age 字段的值大于 20 的记录。
dflzy.filter(dflzy["age"]>20)

image-20231130113111358

(4) groupBy()

groupBy()操作用于对记录进行分组。

# 根据 age 字段进行分组,并对每个分组中包含的记录数量进行统计
dflzy.groupBy("age").count().show()

image-20231130113200672

(5) sort()

sort()操作用于对记录进行排序。

# 表示根据 age 字段进行降序排序;
dflzy.sort(dflzy["age"].desc()).show()
# 表示根据 age 字段进行降序排序,当 age 字段的值相同时,再根据 name 字段的值进行升序排序
dflzy.sort(dflzy["age"].desc(),dflzy["name"].asc()).show()

image-20231130113655460

4. 从 RDD 转换得到 DataFrame
(1) 利用反射机制推断 RDD 模式

把 “/usr/local/spark/examples/src/main/resources/”目录下的people.txt 加载到内存中生成一个 DataFrame,并查询其中的数据。完整的代码及其执行过程如下:

from pyspark.sql import Rowpeople = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line: line.split(",")).map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = spark.createDataFrame(people)
# 必须注册为临时表才能供下面的查询使用
schemaPeople.createOrReplaceTempView("people")
personsDF = spark.sql("select name,age from people where age > 20")
# DataFrame 中的每个元素都是一行记录,包含 name 和 age 两个字段,分别用 p.name 和 p.age 来获取值
personsRDD = personsDF.rdd.map(lambda p: "Name: " + p.name + "," + "Age: " + str(p.age))
personsRDD.foreach(print)

image-20231130113902840

(2)使用编程方式定义 RDD 模式

利用 Spark SQL 查询 people.txt

from pyspark.sql.types import *
from pyspark.sql import Row# 下面生成“表头”
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name inschemaString.split(" ")]
schema = StructType(fields)
# 下面生成“表中的记录
lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda x: x.split(","))
people = parts.map(lambda p: Row(p[0], p[1].strip()))
# 下面把“表头”和“表中的记录”拼装在一起
schemaPeople = spark.createDataFrame(people, schema)
# 注册一个临时表供后面的查询使用
schemaPeople.createOrReplaceTempView("people")
results = spark.sql("SELECT name,age FROM people")
results.show()

image-20231130114042174

实验二、完成p113页实验内容第1题(spark SQL基本操作),另注意自行修改题目中的数据。
1. Spark SQL 基本操作

将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

{ "id":1 , "name":" Ella" , "age":36 } 
{ "id":2, "name":"Bob","age":29 } 
{ "id":3 , "name":"Jack","age":29 } 
{ "id":4 , "name":"Jim","age":28 } 
{ "id":4 , "name":"Jim","age":28 } 
{ "id":5 , "name":"Damon" } 
{ "id":5 , "name":"Damon" }
{ "id":6 , "name":"罗忠烨" }

为 employee.json 创建 DataFrame,并编写 Python 语句完成下列操作:

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("SparkSQLBasicOperations").getOrCreate()# 读取 JSON 文件并创建 DataFrame
employee_dflzy = spark.read.json("/root/Desktop/luozhongye/employee.json")

(1)查询所有数据;

employee_dflzy.show()

image-20231130114320725

(2)查询所有数据,并去除重复的数据;

employee_dflzy.dropDuplicates().show()

image-20231130114352878

(3)查询所有数据,打印时去除 id 字段;

employee_dflzy.select("name", "age").show()

image-20231130114426647

(4)筛选出 age>30 的记录;

employee_dflzy.filter(employee_dflzy["age"] > 30).show()

image-20231130114455985

(5)将数据按 age 分组;

employee_dflzy.groupBy("age").count().show()

image-20231130114520305

(6)将数据按 name 升序排列;

employee_dflzy.orderBy("name").show()

image-20231130114547358

(7)取出前 3 行数据;

employee_dflzy.limit(3).show()

image-20231130114612796

(8)查询所有记录的 name 列,并为其取别名为 username;

employee_dflzy.select("name").withColumnRenamed("name", "username").show()

image-20231130114635538

(9)查询年龄 age 的平均值;

employee_dflzy.agg({"age": "avg"}).show()

image-20231130114703990

(10)查询年龄 age 的最小值。

employee_dflzy.agg({"age": "min"}).show()

image-20231130114736776

(11)停止 SparkSession

spark.stop()
2. 编程实现将 RDD 转换为 DataFrame

源文件employee.txt内容如下(包含 id,name,age):

1,Ella,36 
2,Bob,29 
3,Jack,29

实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 创建 SparkSession
spark = SparkSession.builder.appName("RDDtoDataFrame").getOrCreate()# 读取文本文件并创建 RDD
rdd = spark.sparkContext.textFile("/root/Desktop/luozhongye/employee.txt")# 定义数据模式
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])# 将 RDD 转换为 DataFrame
employee_df = rdd.map(lambda line: line.split(",")).map(lambda x: (int(x[0]), x[1], int(x[2]))).toDF(schema=schema)# 打印 DataFrame 的所有数据
employee_df.show(truncate=False)# 停止 SparkSession
spark.stop()

image-20231130120104859

3. 编程实现利用 DataFrame 读写 MySQL 的数据
(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee, 包含下表 所示的两行数据。
idnamegenderage
1AliceF22
2JohnM25
-- 创建数据库
CREATE DATABASE IF NOT EXISTS sparktest;-- 切换到 sparktest 数据库
USE sparktest;-- 创建 employee 表
CREATE TABLE IF NOT EXISTS employee (id INT PRIMARY KEY,name VARCHAR(255),gender CHAR(1),age INT
);-- 插入数据
INSERT INTO employee VALUES (1, 'Alice', 'F', 22), (2, 'John', 'M', 25);
(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入表 5-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
idnamegenderage
3MaryF26
4TomM23
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame# 创建 SparkSession
#"/path/to/mysql-connector-java-x.x.xx.jar":实际的 MySQL Connector/J JAR 文件路径。
spark = SparkSession.builder.appName("MySQLDataFrame").config("spark.jars", "/path/to/mysql-connector-java-x.x.xx.jar" 
).getOrCreate()# 读取数据到 DataFrame
employee_data = [(3, 'Mary', 'F', 26), (4, 'Tom', 'M', 23)]
columns = ["id", "name", "gender", "age"]
new_data_df = spark.createDataFrame(employee_data, columns)# 配置 MySQL 连接信息
mysql_url = "jdbc:mysql://localhost:3306/sparktest"
mysql_properties = {"user": "your_username",# 实际的 MySQL 用户名"password": "your_password",# 实际的 MySQL 密码"driver": "com.mysql.cj.jdbc.Driver"
}# 将数据写入 MySQL 表
new_data_df.write.jdbc(url=mysql_url, table="employee", mode="append", properties=mysql_properties)# 从 MySQL 中读取数据到 DataFrame
employee_df = spark.read.jdbc(url=mysql_url, table="employee", properties=mysql_properties)# 打印 DataFrame 的所有数据
employee_df.show()# 打印 age 的最大值和总和
employee_df.agg({"age": "max", "age": "sum"}).show()# 停止 SparkSession
spark.stop()

本文结束欢迎点赞,收藏,有问题可以在评论区讨论!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/207071.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mockito加junit gd 单元测试 笔记

目录 一、简介1.1 单元测试的特点1.2 mock类框架使用场景1.3 常用mock类框架1.3.1 mockito1.3.2 easymock1.3.3 powermock1.3.4 JMockit 二、mockito的单独使用2.1 mock对象与spy对象2.2 初始化mock/spy对象的方式2.3 参数匹配2.4 方法插桩2.5 InjectMocks注解的使用断言工具 三…

数字图像处理(实践篇)二 画出图像中目标的轮廓

目录 一 涉及的OpenCV函数 二 代码 三 效果图 一 涉及的OpenCV函数 contours, hierarchy cv2.findContours(image, mode, method[, contours[, hierarchy[, offset ]]]) image:源图像。mode:轮廓的检索方式。cv2.RETR_EXTERNAL(只检测…

hive创建ES外部表过程中的问题

一、缺少jar包:httpclient 报错: “HiveServer2-Handler-Pool: Thread-696” java.lang.NoClassDefFoundError: org/apache/commons/httpclient/protocol/ProtocolSocketFactory 需要加载commons-httpclient-3.1.jar 二、缺少jar包:eshado…

Vue.js 组件生命周期

Vue.js 组件生命周期 生命周期函数(钩子)就是给我们提供了一些特定的时刻,让我们可以在这个周期段内加入自己的代码,做一些需要的事情; 生命周期钩子中的this指向是VM 或 组件实例对象 在JS 中,函数的执行上下文&#…

Kubernetes 安全最佳实践:保护您的秘密

Kubernetes 是一个可用于微服务的开源容器编排平台。当我们想要部署容器化应用程序、自动化管理和扩展应用程序时,Kubernetes 非常有用。 在容器中运行单个微服务而不是在同一虚拟机中运行多个进程几乎总是更安全。每当我们在 Kubernetes 中启动任何 pod 时&#x…

AndroidStudio2022.3.1 Patch3使用国内下载源加速

记录一下这个版本的as在使用国内下载源加速碰到的诸多问题。 一、gradle-8.0-bin.zip下载慢 编辑项目文件夹/gradle/wrapper/gradle-wrapper.properties,文件内容改为如下: #Fri Nov 24 18:50:06 CST 2023 distributionBaseGRADLE_USER_HOME distribu…

linux服务器环境搭建(使用yum 安装mysql、jdk、redis)

一:yum的安装 1:下载yum安装包并解压 wget http://yum.baseurl.org/download/3.2/yum-3.2.28.tar.gz tar xvf yum-3.2.28.tar.gz 2.进入yum-3.2.28文件夹中进行安装,执行安装指令 cd yum-3.2.28 sudo apt install yum 3.更新版本 yum check-update yum update yum cle…

Open3D 点对点的ICP配准算法

一、主要函数 1、该类TransformationEstimationPointToPoint提供用于计算点对点ICP目标函数的残差和雅可比矩阵的函数。函数registration_icp将其作为参数并运行点对点ICP以获得结果。 2、该函数evaluate_registration计算两个主要指标。fitness计算重叠区域(内点对…

服务器之间的conda环境迁移

有的时候python环境中可能包含了我们编译好的很多库文件,如果在别的服务器想直接使用环境就会比较困难些。最好的办法就是直接迁移环境。而传统的迁移方法导出“*.yaml”环境配置的这种方法,实际是需要重新安装环境,对于这种安装好的环境是不…

【排序,直接插入排序 折半插入排序 希尔插入排序】

文章目录 排序排序方法的分类插入排序直接插入排序折半插入排序希尔插入排序 排序 将一组杂乱无章的数据按照一定规律排列起来。将无序序列排成一个有序序列。 排序方法的分类 储存介质: 内部排序:数据量不大,数据在内存,无需…

PHP在线日语学习平台

有需要请加文章底部Q哦 可远程调试 PHP在线日语学习平台 一 介绍 此日语学习平台基于原生PHP开发,数据库mysql。系统角色分为用户和管理员。(附带参考设计文档) 技术栈:phpmysqlphpstudyvscode 二 功能 学生 1 注册/登录/注销 2 个人中心 3 查看课程…

首次部署Linux系统的经历

我是一名电子信息工程专业的学生,有次在图书馆上自习的时候无意间看到其他同学的电脑屏幕,黑色的屏幕上显示着一行一行的代码,勾起了我无限的好奇,经过询问得知他是用的Linux操作系统,是和Windows完全不同的系统&#…

记RocketMQ本地开发环境搭建始末

前言 最近工作中涉及到了RocketMQ的应用,为方便开发决定本地搭建一套RocketMQ的使用环境。 果然实践是个好东西... VMware虚拟环境搭建 这个网上有很多教程,只会比我写的详细有条理,这里就不在赘述了。 虚拟机搭建好之后每次重启电脑都无…

智能优化算法应用:基于风驱动算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于风驱动算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于风驱动算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.风驱动算法4.实验参数设定5.算法结果6.参考文献7.…

vue3实现element table缓存滚动条

背景 对于后台管理系统,数据的展示形式大多都是通过表格,常常会出现的一种场景,从表格跳到二级页面,再返回上一页时,需要缓存当前的页码和滚动条的位置,以为使用keep-alive就能实现这两种诉求,…

threeJs引入模型使用3D模型(vite+React+Ts)

要在 Three.js 中使用 3D 模型,你需要加载模型文件并将其添加到场景中。Three.js 支持多种不同的模型格式,比如 OBJ、FBX、GLTF 等。 init vitelatest //创建一个vite的脚手架 选择react并配置Ts 安装three.js准备 npm install react-three/drei np…

阿里云新版公共实例从注册账号到创建设备生成参数教程

1 注册阿里云 打开阿里云官网,点击右上角的登录/注册 打开的界面按照图片输入手机号注册 注册成功后,登录返回第一次打开的界面,点击控制台 点击控制台后界面如下 点击左上角的菜单,弹出新窗口,搜索物联网平台 开通物…

Wireshark之Intro, HTTP, DNS

源码地址👇 moranzcw/Computer-Networking-A-Top-Down-Approach-NOTES: 《计算机网络-自顶向下方法(原书第6版)》编程作业,Wireshark实验文档的翻译和解答。 (github.com) 目录 🌼Introduce 🎧前置 🎧过…

MySQL之 InnoDB逻辑存储结构

InnoDB逻辑存储结构 InnoDB将所有数据都存放在表空间中,表空间又由段(segment)、区(extent)、页(page)组成。InnoDB存储引擎的逻辑存储结构大致如下图。下面我们就一个个来看看。 页&#xff08…

数据结构-二叉树(1)

1.树概念及结构 1.1树的概念 树是一种非线性的数据结构,它是由n(n>0)个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树,也就是说它是根朝上,而叶朝下的。 1.有一个特殊的结点&…