【头歌实训】PySpark Streaming 数据源

文章目录

  • 第1关:MySQL 数据源
    • 任务描述
    • 相关知识
      • PySpark JDBC 概述
      • PySpark JDBC
      • PySpark Streaming JDBC
    • 编程要求
    • 测试说明
    • 答案代码
  • 第2关:Kafka 数据源
    • 任务描述
    • 相关知识
      • Kafka 概述
      • Kafka 使用基础
      • PySpark Streaming Kafka
    • 编程要求
    • 测试说明
    • 答案代码

第1关:MySQL 数据源

任务描述

本关任务:读取套接字流数据,完成词频统计,将结果写入 Mysql 中。

相关知识

为了完成本关任务,你需要掌握:

  1. PySpark JDBC 概述;
  2. PySpark JDBC;
  3. PySpark Streaming JDBC。

PySpark JDBC 概述

在 PySpark 中支持通过 JDBC 的方式连接到其他数据库获取数据生成 DataFrame,当然也同样可以使用 Spark SQL 去读写数据库。除了 JDBC 数据源外,还支持 ParquetJSONHive 等数据源。

PySpark JDBC

在学习 PySpark Streaming JDBC 之前,我们先来了解一下在 PySpark 中如何使用 JDBC。

需求:

  • 读取 Mysql 中的数据;
  • 往 Mysql 中写入数据。

首先,打开右侧命令行窗口,等待连接后,进入 MySQL,任意创建一个库,在该库中任意创建一张表,任意写入一些数据。

# 启动 mysql 服务
service mysql start
# 进入 mysql
mysql -uroot -p123123
# 创建 test 库
create database if not exists test;
# 创建表
use test;
create table if not exists student(
id int,
name varchar(50),
class varchar(50));
# 数据写入
insert into  student values(1,"zhangsan","A");
insert into  student values(2,"lisi","B");
insert into  student values(3,"wangwu","C");

创建完成后,进入 python3 shell 界面。

python3

,

开始编写程序,第一步,先导入相关包

from findspark import init
init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

第二步,创建 Spark 对象

spark = SparkSession.builder.appName("read_mysql").master("local[*]").getOrCreate()

第三步,读取 Mysql 中的数据

dataFrame = spark.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver")  .option("url", "jdbc:mysql://localhost:3306/test").option("dbtable", "student")  .option("user", "root").option("password", "123123").load()

第四步,输出读取的数据

# 注意,show() 方法默认只会显示前 20 行数据。
dataFrame.show()

输出结果如图所示: ,

第五步,将读取的数据以追加的方式写入库中

dataFrame.write.format("jdbc").option("driver", "com.mysql.jdbc.Driver")  .option("url", "jdbc:mysql://localhost:3306/test").option("dbtable", "student").option("user", "root").option("password", "123123").mode(saveMode="append").save()

进入 Mysql 中查看结果:

# 进入 Mysql
mysql -uroot -p123123
# 查询数据
select * from test.student;

,

PySpark Streaming JDBC

通过对 PySpark JDBC 的学习,我们了解了在 Python 中是如何使用 JDBC 的,现在来学习 PySpark Streaming JDBC 的连接方式。

需求:通过读取套接字流,进行词频统计,将数据写入 Mysql 中。

首先,打开右侧命令行窗口,等待连接后,进入 MySQL,创建 spark 库,在该库中创建 wordcount 表。

# 启动 mysql 服务
service mysql start
# 进入 mysql
mysql -uroot -p123123
# 创建 test 库
create database if not exists spark;
# 创建表
use spark;
create table if not exists wordcount(
word varchar(50),
count int);

创建完成后,进入主目录 /root,创建代码文件 mysql.py,对其进行编辑。

cd /root
vi mysql.py

开始编写程序,第一步,先导入相关包

from findspark import init
init()
import time
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

第二步,创建 Spark 环境与检查点

sc = SparkContext(appName="mysql_streaming", master="local[*]")
ssc = StreamingContext(sc, 10)
# 设置套接字流信息
inputStream = ssc.socketTextStream("localhost", 7777)
# 设置检查点
ssc.checkpoint("/usr/local/spark")

第三步,对数据进行相关操作

# 累加器(状态更新)
def updateFunction(newValues, runningCount):if runningCount is None:runningCount = 0return sum(newValues, runningCount)
pairs = inputStream.flatMap(lambda x: x.split(" ")).filter(lambda x: x != "").map(lambda x: (x, 1))
wordCounts = pairs.updateStateByKey(updateFunction)
wordCounts.pprint(100)

第四步,写入 Mysql 处理

def dbfunc(records):db = pymysql.connect("localhost", "root", "123123", "spark")cursor = db.cursor()def doinsert(p):sql = "insert into wordcount(word,count) values ('%s', '%s')" % (str(p[0]), str(p[1]))try:cursor.execute(sql)db.commit()except:db.rollback()for item in records:doinsert(item)
def func(rdd):repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(dbfunc)
wordCounts.foreachRDD(func=func)

第五步,启动与停止

ssc.start()
time.sleep(30)
ssc.stop()

第六步,新增一个命令行窗口,启动数据流服务

nc -l -p 7777

第七步,返回代码文件窗口,运行程序

python3 /root/mysql.py

第八步,程序启动后,切换到数据流服务窗口,输入如下数据:

hello pyspark
hello pyspark streaming
hello jdbc

程序结束后,进入 Mysql 中查看结果:

# 进入 Mysql
mysql -uroot -p123123
# 查询数据
select distinct(word),count from spark.wordcount;

结果如图所示:

,

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码,执行程序,读取套接字流数据,按空格进行分词,完成词频统计。在 Mysql 中创建 work 数据库,在该库中创建表 wordcount,添加字段 word(字符型),字段 count(整型),将词频统计结果写入该表中。

代码文件目录: /data/workspace/myshixun/project/step1/work.py

套接字流相关信息:

  • 地址:localhost
  • 端口:8888
  • 输入数据:

待程序启动后(5s),请在 60 秒内写入数据,如果需要调整时间,你可以通过修改代码文件中 time.sleep(60) 来指定时间。

When summer comes, people like to go to the beach and play in the seawater.
It is such a good way to drive away the hotness.
But it has been reported that many people drawn while they were swimming on the beach. 
The people who died were good at swimming, the reason they got killed was the invisible demon under the seawater. 
In the afternoon, there are some vortexes under the seawater, which people can’t see. 
When people go swimming, they will be absorbed by the vortexes, even though they are good at swimming, they can’t resist the strong power.
So when we go to play in the beach, we must take care.

输入内容后,注意按回车。

Mysql 信息:

  • 账号:root
  • 密码:123123
  • 地址:localhost
  • 端口:3306

请在程序运行完成后再进行评测,否则会影响最终结果。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

from findspark import init
init()
import time
import pymysql
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext(appName="mysql_streaming", master="local[*]")ssc = StreamingContext(sc, 10)# 设置检查点
ssc.checkpoint("/usr/local/work")# 累加器(状态更新)
def updateFunction(newValues, runningCount):if runningCount is None:runningCount = 0return sum(newValues, runningCount)# 设置套接字流
############### Begin ###############
inputStream = ssc.socketTextStream("localhost", 8888)############### End ###############pairs = inputStream.flatMap(lambda x: x.split(" ")).filter(lambda x: x != "").map(lambda word: (word, 1))wordCounts = pairs.updateStateByKey(updateFunction)wordCounts.pprint(100)def dbfunc(records):# 根据传入的 records 参数,完成数据写入 Mysql 操作############### Begin ################ 连接 MySQL 数据库connection = pymysql.connect(host='localhost',user='root',password='123123',database='work',port=3306,)with connection.cursor() as cursor:# 根据传入的 records 参数,完成数据写入 Mysql 操作for record in records:word, count = recordcursor.execute('INSERT INTO wordcount (word, count) VALUES (%s, %s)', (word, count))connection.commit()connection.close()############### End ################ 分区设置
def func(rdd):repartitionedRDD = rdd.repartition(3)repartitionedRDD.foreachPartition(dbfunc)wordCounts.foreachRDD(func=func)ssc.start()time.sleep(60)ssc.stop() 

打开一个命令行窗口

# 启动 mysql 服务
service mysql start
# 进入 mysql
mysql -uroot -p123123
# 创建 test 库
create database if not exists work;
# 创建表
use work;
create table if not exists wordcount(word varchar(50),count int
);
# 退出 mysql
exit
# 创建检查点目录
mkdir -p /usr/local/work/
nc -l -p 8888

再打开一个窗口

chmod 777 /data/workspace/myshixun/project/step1/work.py
python3 /data/workspace/myshixun/project/step1/work.py # 现在开始运行代码文件,请在 60 秒内创建文件并写入下面数据

回到第一个窗口,把下面数据粘贴上去再打一个回车

When summer comes, people like to go to the beach and play in the seawater.
It is such a good way to drive away the hotness.
But it has been reported that many people drawn while they were swimming on the beach. 
The people who died were good at swimming, the reason they got killed was the invisible demon under the seawater. 
In the afternoon, there are some vortexes under the seawater, which people can’t see. 
When people go swimming, they will be absorbed by the vortexes, even though they are good at swimming, they can’t resist the strong power.
So when we go to play in the beach, we must take care.

第2关:Kafka 数据源

任务描述

本关任务:读取 Kafka 生产的数据,完成输出。

相关知识

为了完成本关任务,你需要掌握:

  1. Kafka 概述;
  2. Kafka 使用基础;
  3. PySpark Streaming Kafka。

Kafka 概述

Kafka 就是一个分布式的用于消息存储的发布订阅模式的消息队列。一般用于大数据的流式处理中。 具有高水平扩展性、高容错性、访问速度快、分布式等特性,主要应用场景是日志收集系统和消息系统。但是随着 Kafka 的快速发展,也被应用于高性能数据管道、数据集成、流分析等。

img

Kafka 使用基础

在学习 Pyspark streaming Kafka 之前,我们先来学习一下 Kafka 的使用基础。

首先,打开右侧命令行窗口,等待连接后,启动 Kafka 服务

# kafka 依赖 zookeeper,所以需要先启动 zookeeper 服务
cd /opt/zookeeper
bin/zkServer.sh start conf/zoo.cfg
# 启动 Kafka 服务
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties

检查服务是否启动成功,输入 jps 后,出现如下所示,表示启动成功: ,

创建 topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic first 

这个 topicfirst2181zookeeper 默认的端口号,partitiontopic 里面的分区数,replication-factor 是备份的数量,在 kafka 集群中使用,这里单机版就不用备份了。

查看当前服务器中的所有 topic

bin/kafka-topics.sh --zookeeper localhost:2181 --list 

,

创建 producer 生产者

在 xxx 节点发送消息。

bin/kafka-console-producer.sh --broker-list xxx:9092 --topic first

创建 consumer 消费者

在 xxx 节点接收消息。

bin/kafka-console-consumer.sh --zookeeper xxx:2181 --from-beginning --topic first

删除 topic

bin/kafka-topics.sh --zookeeper master:2181 --delete --topic first

PySpark Streaming Kafka

通过对 Kafka 基础使用的学习,现在来通过一个案例学习在 PySpark Streaming 中如何连接 Kafka。

需求:消费 Kafka 生产的数据,完成输出。

首先,打开右侧命令行窗口,等待连接后,启动 Kafka 服务。

# 启动 zookeeper 服务
cd /opt/zookeeper
bin/zkServer.sh start conf/zoo.cfg
# 启动 Kafka 服务
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties

创建 topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test

新增一个命令行窗口,等待连接后,在 /root 目录下创建 test.py 程序文件

cd /root
touch test.py

编辑文件 test.py,编写程序,第一步,导入相关文件包。

from pyspark.sql import SparkSession

第二步,创建 Spark 环境

spark=SparkSession.builder.appName("kafka_stream").master("local[*]").getOrCreate()

第三步,创建 Kafka 数据流

在 pyspark 中,我们通过 KafkaUtils.createStream() 方法创建 Kafka 数据流,但该方法在 Spark 3.0 中以及弃用,现在采用 spark.readStream.format("kafka") 方法来创建 Kafka 数据流。

df = spark \.readStream \.format("kafka") \# 绑定 Kafka 生产地址.option("kafka.bootstrap.servers", "localhost:9092") \# 订阅 topic.option("subscribe", "test") \# 设置偏移量(最新).option("startingOffsets","latest") \.load()

第四步,收集数据

table = df.selectExpr("CAST(value AS STRING)")

第五步,输出到屏幕,启动程序

table.writeStream \# 指定监听间隔时间.trigger(processingTime='5 seconds') \# 输出方式.outputMode("append") \# 不将内容进行清空.option("truncate", "false")\.format("console") \.start() \# 60 秒后停止程序.awaitTermination(timeout=60)

编写完程序后,保存退出,切换到 Kafka 服务的命令行窗口,创建生产者。

cd /opt/kafka
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

第六步,返回编写程序文件的命令行窗口,运行程序

spark-submit --master local[*] --driver-class-path /opt/kafka/libs/kafka-clients-2.8.0.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --jars /opt/spark/jars/spark-streaming-kafka-0-10-assembly_2,12-3,0,2.jar --py-files test.zip test.py 

注意,运行程序前需要先压缩程序文件,压缩命令语法如下:

zip  压缩后文件名.zip  原文件名

第七步,程序启动后,切换到 Kafka 数据流服务窗口,输入如下数据:

hello kafka
hello pyspark streaming
I love big data

结果如图所示:

,

编程要求

打开右侧代码文件窗口,在 BeginEnd 区域补充代码。 在 Kafka 中创建一个 topic,作为一个生产者,完善程序,读取 Kafka 流数据并以 append 方式输出。通过 spark-submit 的方式运行代码文件,将输出信息保存到 /data/workspace/myshixun/project/step2/result.txt 结果文件中。

代码文件目录: /data/workspace/myshixun/project/step2/work.py

Kafka 相关信息:

  • Kafka 主目录:/opt/kafka
  • Zookeeper 主目录:/opt/zookeeper
  • Zookeeper 地址:localhost:2181

Kafka 输入内容:

程序启动后(15s左右),请在 60 秒内写入数据,如果需要调整时间,你可以通过修改代码文件中 .awaitTermination(timeout=60)timeout 指定时间。

Hello world!
Hello python!
Hello spark!
Hello Kafka!
I love bigdata.

提交命令:

注意压缩文件。

spark-submit --master local[*] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --py-files xxx.zip xxx.py > /data/workspace/myshixun/project/step2/result.txt

请等待程序运行完成后进行评测,否则会影响最终结果。

测试说明

平台将对你编写的代码进行评测,如果与预期结果一致,则通关,否则测试失败。

答案代码

from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("kafka_stream").master("local[*]").getOrCreate()############### Begin ###############df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "test") \.option("startingOffsets","latest") \.load()table = df.selectExpr("CAST(value AS STRING) as message")table.writeStream \.trigger(processingTime='5 seconds') \.outputMode("append") \.option("truncate", "false")\.format("console") \.start() \.awaitTermination(timeout=60) ############### Begin ###############

进入右侧命令行窗口

# kafka 依赖 zookeeper,所以需要先启动 zookeeper 服务
cd /opt/zookeeper
bin/zkServer.sh start conf/zoo.cfg
# 启动 Kafka 服务
cd /opt/kafka
bin/kafka-server-start.sh -daemon config/server.properties
# 创建 topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic test
# 创建 producer 生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

再打开一个命令行窗口

cd /data/workspace/myshixun/project/step2/
zip work.zip work.py
spark-submit --master local[*] --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --py-files work.zip work.py > result.txt

回到前一个命令行窗口,在程序启动 15s 左右时间后再填入下面数据,并且在 60s 内完成写入

Hello world!
Hello python!
Hello spark!
Hello Kafka!
I love bigdata.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/225749.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

netty trojan

参考代码:https://github.com/kdyzm/trojan-client-netty 参考博客: github代码作者的博客:https://blog.kdyzm.cn/post/71 trojan-go介绍:https://p4gefau1t.github.io/trojan-go/developer/trojan/ trojan协议介绍:h…

2014年第三届数学建模国际赛小美赛A题吹口哨解题全过程文档及程序

2014年第三届数学建模国际赛小美赛 A题 吹口哨 原题再现: 哨子是一种小装置,当空气被迫通过开口时会发出声音。哨声的巨大而引人注目,使其对警察和体育裁判来说至关重要。当救生员、迷路的露营者或犯罪受害者使用它们时,它们可以…

创建springboot项目

SpringBoot 就相当于不需要配置文件的SpringSpringMVC。 常用的框架和第三方库都已经配置好了。 maven安装配置 管理项目依赖库的 maven的安装教程网上有很多,这里简单记录一下。 官网下载maven后并解压。 在其目录下添加一个目录repository 然后在conf目录下…

怎么制作有时效的文件二维码?二维码加密、有效期的设置技巧

在制作文件二维码的时候,如何设置文件的有效期呢?一般将文件生成二维码查看能够避免在微信或者QQ云端保存有时间限制的问题,而且扫码阅读文件或者下载文件也更加的方便。那么如果我们想要做一个文件类型的二维码,但是想要设置或者…

QT QString中mid()、left()、right()函数

mid函数原型: QString QString::mid(int position, int n -1) const 返回一个从position开始,长度为n的QString 类型的子串。position不能超出字符串长度,否则返回null;当从position开始的子串长度不够n或n为-1(缺省…

list集合

List集合 List集合的概述 有序集合(也称之为序列),用户可以精确的控制列表中的每个元素的插入位置。用户可以通过整数索引访问元素,并搜索列表中的元素 与 Set 集合不同,列表通常允许重复的元素 List 集合的特点 有…

Redis分布式缓存之主从哨兵分片集群

Redis主从 数据同步原理 Redis哨兵 Redis分片集群 集群伸缩:在集群中插入或删除某个节点 集群故障转移

vue2、vue3状态管理之vuex、pinia

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、状态管理之vuex1.1 State调用:1.2 Mutation在vuex中定义:在组件中使用: 1.3 Action在vuex中定义:将上面的减…

性能测试-jemeter:安装 / 基础使用

一、理解jemeter 官网-Apache JMeter-Apache JMeter™ JMeter是一款开源的性能测试工具,主要用于模拟大量用户并发访问目标服务器,以评估服务器的性能和稳定性。 JMeter可以执行以下任务序号用途描述1性能测试通过模拟多个用户在同一时间对服务器进行…

【Graylog】通过Pipelines在Graylog生成IP地理位置信息

序 在当今数字化时代,随着网络攻击的不断增加和全球化的用户活动,了解IP地址的地理位置信息变得越来越重要。对于网络安全和营销策略来说,掌握IP地址的地理信息可以带来许多好处。 接下里将介绍如何通过Graylog的Pipelines功能,…

linux开放tomcat 8080端口

1、查看8080是否开放 firewall-cmd --query-port8080/tcp查看已开启的端口 firewall-cmd --list-ports开启防火墙 systemctl start firewalld2、永久开放8080端口 firewall-cmd --zonepublic --add-port8080/tcp --permanent3、重加载(重启防火墙) …

如何开发一个类似美团小程序商家入驻

上线类似美团的小程序可以推动商业生态系统的优化和升级。小程序可以连接商家、用户和平台,促进信息流通和交易,提高整个生态系统的效率和效益。今天,我们分享如何开发一个类似美团小程序商家入驻的平台。大家点个关注点个赞,我们…

本地登陆页面:对本地搜索词进行排名的策略

位置着陆页没有得到足够的尊重。 你用你的姓名、地址、电话号码和工作时间来设置它们。也许您嵌入了用于行车路线的 Google 地图。 也许你写了一些没人会读的副本,如果你有多个位置,你在每一页上重复相同的副本,只是更改位置名称。 如果你…

408数据结构错题知识点拾遗

个人向错题相关部分整理,涵盖真题、模拟、课后习题等。 408相关: 408数据结构错题知识点拾遗 408数据结构常考算法基础训练等待完善 408计算机组成原理错题知识点拾遗408操作系统错题知识点拾遗等待完善408计算机网络错题知识点拾遗 408计算机网络各层协…

DRF从入门到精通五(路由组件、认证组件、权限组件、频率组件及认证、权限源码分析)

文章目录 一、路由组件REST framework提供了两个routeraction装饰器 二、认证组件(Authentication)三、权限组件(Permissions)内置权限类 四、频率组件(Throttling)五、权限组件源码分析六、认证组件源码分析 一、路由组件 对于视图集ViewSetMixin,我们除了可以自己…

JavaWeb的Servlet的入门和使用方法

1 什么是Servlet Servlet是Server Applet的简称,是用Java编写的是运行在 Web 服务器上的程序,它是作为来自 Web 浏览器或其他 HTTP 客户端的请求和 HTTP 服务器上的数据库或应用程序之间的中间层。使用 Servlet,可以收集来自网页表单的用户输…

Elasticsearch:无需搜索 “Christmas” 即可找到有关圣诞节的书籍

随着假期的临近,我期待着变得舒适,拿起一本新书,享受轻松的时光。 但是使用搜索栏在线发现图书并不像看起来那么容易......大多数零售搜索引擎仅依赖于关键字搜索,当我们确切地知道我们正在寻找什么书名时,这很好&…

Servlet见解2

4 创建servlet的三种方式 4.1 实现Servlet接口的方式 import javax.servlet.*; import javax.servlet.annotation.WebServlet; import java.io.IOException;WebServlet("/test1") public class Servlet1 implements Servlet {Overridepublic void init(ServletConf…

【前端技术】Vite vs Webpack

✨专栏介绍 在当今数字化时代,Web应用程序已经成为了人们生活和工作中不可或缺的一部分。而要构建出令人印象深刻且功能强大的Web应用程序,就需要掌握一系列前端技术。前端技术涵盖了HTML、CSS和JavaScript等核心技术,以及各种框架、库和工具…

克魔助手工具下载、注册和登录指南

下载安装克魔助手 摘要 本文介绍了如何下载安装克魔助手工具,以及注册和登录流程。通过简单的步骤,用户可以轻松获取并使用该工具,为后续的手机应用管理操作做好准备。 引言 克魔助手是一款免费的手机管理工具,通过该工具用户…