【大数据技术】Spark分布式实现词频统计(hadoop+python+spark)

Spark分布式实现词频统计(hadoop+python+spark)

  • 搭建完全分布式高可用大数据集群(VMware+CentOS+FinalShell)

  • 搭建完全分布式高可用大数据集群(Hadoop+MapReduce+Yarn)

  • 本机PyCharm远程连接CentOS虚拟机(Python)

  • 搭建完全分布式高可用大数据集群(Scala+Spark)

在阅读本文前,请确保已经阅读过以上4篇文章,成功搭建了Hadoop+MapReduce+Yarn+Python+Spark的大数据集群环境。

写在前面

本文主要介绍基于hadoop+spark技术,自己编写python代码实现单词词频统计的详细步骤。

  • 电脑系统:Windows

  • 技术需求:HadoopMapReduceYarnPythonSpark

  • 使用软件:VMwareFinalShellPyCharm

注:本文的所有操作均在虚拟机master中进行,不涉及另外两台虚拟机。

启动Hadoop

  1. 使用finalshell连接并启动masterslave01slave02三台虚拟机。

  2. 在虚拟机master的终端输入命令start-all.sh启动hadoop、mapreduce和yarn。

  3. 随后可以用命令jps查看是否成功启动集群。

准备数据

注意:该部分的数据文件为/data/word.txt,如果做过之前的案例,已经拥有该数据文件,可以跳过该部分。

  1. 创建文本数据

① 在虚拟机master的终端输入命令mkdir /data创建一个/data目录。

00

② 在虚拟机master的终端输入命令 vi /data/word.txt 创建并打开word.txt文件,填入以下内容。

hello world
hello hadoop
hello hdfs
hello yarn

01

  1. 创建目录

① 在终端输入以下命令,可以在HDFS中创建/wordcount/input目录,用于存放文件word.txt

hdfs dfs -mkdir -p /wordcount/input

② 在终端输入以下命令验证是否创建/wordcount/input目录。

hdfs dfs -ls /

02

  1. 上传文件

① 在终端执行以下命令将文件word.txt上传到HDFS的/wordcount/input目录。

hdfs dfs -put /data/word.txt /wordcount/input

② 在终端输入以下命令验证是否成功将文件word.txt上传到HDFS的/wordcount/input目录。

hdfs dfs -ls /wordcount/input

03

③ 可以使用以下命令查看上传的word.txt文件的内容。

hdfs dfs -cat /wordcount/input/word.txt

04

④ 也可以通过HDFS的Web UI(http://master:9870)查看文件word.txt是否上传成功。

05

编写Python脚本

打开PyCharm专业版,远程连接虚拟机master,创建脚本/wordcount/wordspark.py,填入以下代码。

from pyspark.sql import SparkSession# 创建 SparkSession,连接到 Hadoop 集群
spark = SparkSession.builder \.appName("WordCount") \.getOrCreate()# 从 HDFS 读取输入文件
text_file = spark.sparkContext.textFile("hdfs://master:9000/wordcount/input/word.txt")# 计算词频
counts = text_file.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a + b)# 将结果保存到 HDFS
counts.saveAsTextFile("hdfs://master:9000/wordcount/output")# 停止 SparkSession
spark.stop()

这段代码是一个典型的使用 PySpark 实现的词频统计程序,具体分析如下。

  1. 导入 PySpark 模块
from pyspark.sql import SparkSession
  • SparkSession 是 PySpark 中的入口点,用于创建和配置 Spark 应用程序。SparkSession 提供了多种方法,允许我们与 Spark 集群进行交互,包括读取数据、执行转换、管理 Spark 作业等。
  1. 创建 SparkSession
spark = SparkSession.builder \.appName("WordCount") \.getOrCreate()
  • SparkSession.builder 是用来配置并构建一个 SparkSession 实例。通过 appName 方法,给当前的 Spark 应用程序指定一个名称(这里是 “WordCount”)。getOrCreate() 会返回一个现有的 SparkSession 或创建一个新的实例,如果 Spark 会话已经存在,它将返回该会话。
  1. 读取输入文件
text_file = spark.sparkContext.textFile("hdfs://master:9000/wordcount/input/input.txt")
  • sparkContext.textFile() 用于读取文本文件,并将其分割成多个行(行级数据)。这里的输入文件位于 HDFS 路径 hdfs://master:9000/wordcount/input/input.txt
  • sparkContext 是 SparkSession 的底层对象,它是与底层 Spark 集群进行交互的接口。
  • HDFS(Hadoop 分布式文件系统)作为分布式存储系统,存储着待处理的文件数据。
  1. 计算词频
counts = text_file.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a + b)
  • flatMap(lambda line: line.split(" "))
    • flatMap 是一种转换操作,它会将每一行的文本通过空格分割成多个单词,并返回一个由单词构成的平坦化列表。例如,一行文本 "hello world" 会变成 ["hello", "world"]
    • flatMap 的特点是会扁平化返回的列表,生成的 RDD(弹性分布式数据集)将包含所有的单词。
  • map(lambda word: (word, 1))
    • map 操作对每个单词进行转换,返回一个键值对 (word, 1),其中 word 是单词,1 表示出现的次数。
    • 这样,对于每个单词,都会创建一个键值对,后续会对相同的单词进行聚合操作。
  • reduceByKey(lambda a, b: a + b)
    • reduceByKey 是对相同键(单词)进行归约操作。它会将具有相同键的所有值(次数)加起来,得到每个单词的总词频。
    • lambda a, b: a + b 表示对于同一单词的多个 1 值,执行求和操作。
  1. 保存结果
counts.saveAsTextFile("hdfs://master:9000/wordcount/output")
  • saveAsTextFile() 方法将结果保存到指定路径。在此,计算得到的词频统计结果会被保存到 HDFS 路径 hdfs://master:9000/wordcount/output
  • 结果会以文本文件的形式保存,每个文件包含一部分输出数据,Spark 会自动将结果分布在多个文件中。
  1. 停止 SparkSession
spark.stop()
  • stop() 方法用于停止当前的 SparkSession,这样可以释放占用的资源。
  • 停止 SparkSession 是良好的实践,特别是在处理完 Spark 作业后,防止资源泄漏。

总的来说,这段代码完成了一个简单的分布式词频统计任务,其基本步骤包括:

  1. 初始化 SparkSession。
  2. 从 HDFS 中读取输入数据。
  3. 对输入数据进行词频统计:拆分单词、生成键值对、按键聚合计算词频。
  4. 将统计结果保存回 HDFS。
  5. 最后关闭 SparkSession,释放资源。

这种类型的作业常见于大数据处理和日志分析等场景。通过 Spark 的分布式计算能力,能够高效地处理大量文本数据并进行复杂计算。

运行Python脚本

注意:运行Python脚本前请确保已经启动hadoop集群。

  1. 输入以下命令查看虚拟机是否有pip工具。
pip --version

06

注意:在虚拟机master中输入命令pip --version ,如果提示没有pip,请根据提示安装pip。

07

  1. 输入以下命令安装pyspark库。
pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple

08

  1. 在PyCharm中运行wordspark.py程序。

09

  1. 在HDFS的Web UI(http://master:9870/explorer.html#/wordcount/output)中查看程序运行结果。

注意:有part-00000part-00001两个文件,因为是分布式存储。

10

11

运行Spark程序

注意:建议在运行spark程序前,三台虚拟机的配置为:

  • master:8G内存,4个CPU
  • slave01:4G内存,2个CPU
  • slave02:4G内存,2个CPU

  1. 在运行Spark程序前,请先删除http://master:9870/explorer.html#/wordcount/output目录。

12

  1. 输入以下命令关闭HDFS的安全模式。
hdfs dfsadmin -safemode leave

13

  1. 输入以下命令运行spark代码。

注意:运行前请确保HDFS中/wordcount/output文件不存在,如果存在,请将其删除。

spark-submit --master yarn /opt/python/code/wordcount/wordspark.py
  • spark-submit :这是启动 Spark 应用程序的命令。无论你的应用程序是使用 Scala、Java、Python 还是 R 编写的,都需要通过这个命令来提交。

  • --master yarn :这个参数指定了要使用的集群管理器(master)。在这里指定的是 YARN (Yet Another Resource Negotiator),这意味着你希望在配置为使用 YARN 作为资源管理器的 Hadoop 集群上运行此 Spark 应用程序。YARN 负责管理集群中的资源(如内存、CPU等)以及调度任务。

  • /opt/python/code/wordcount/wordspark.py :这是你想要运行的 Spark 应用程序的入口脚本。在这个例子中,它是一个 Python 文件,实现了 WordCount 算法,通常用于计算输入数据集中每个单词出现的次数。WordCount 是一个经典的入门示例,常用来展示大数据处理框架的基本使用方法。

总结一下,这条命令的作用是告诉 Spark 以客户端模式向 YARN 集群提交 wordspark.py 这个 Spark 应用程序,并由 YARN 来负责分配资源和调度作业执行。

14

  1. 重新启动安全模式。
hdfs dfsadmin -safemode enter

查看程序运行状态和结果

  1. 程序运行过程中,可以使用浏览器访问Spark的Web UI(http://master:4040/jobs/)查看程序的运行状态。

15

  1. 程序运行过程中,也可以使用浏览器访问YARN的Web UI(http://master:8088)查看程序的运行状态。

16

  1. 程序运行结束后,可以在HDFS的Web UI(http://master:9870)查看词频统计的结果。

17

  1. 当然,也可以在master的终端输入以下命令查看程序运行结果。
hdfs dfs -cat /wordcount/output/part-00000
hdfs dfs -cat /wordcount/output/part-00001

18

写在后面

本文仅供学习使用,原创文章,请勿转载,谢谢配合。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/14573.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

28.<Spring博客系统⑤(部署的整个过程(CentOS))>

引入依赖 Spring-boot-maven-plugin 用maven进行打包的时候必须用到这个插件。看看自己pom.xml中有没有这个插件 并且看看配置正确不正常。 注&#xff1a;我们这个项目打的jar包在30MB左右。 <plugin><groupId>org.springframework.boot</groupId><artif…

C++Primer学习(2.2)

2.2 变量 变量提供一个具名的、可供程序操作的存储空间。C中的每个变量都有其数据类型,数据类型决定着变量所占内存空间的大小和布局方式、该空间能存储的值的范围&#xff0c;以及变量能参与的运算。对C程序员来说,“变量(variable)”和“对象(object)”一般可以互换使用。 术…

电脑开机提示按f1原因分析及终极解决方法来了

经常有网友问到一个问题&#xff0c;我电脑开机后提示按f1怎么解决&#xff1f;不管理是台式电脑&#xff0c;还是笔记本&#xff0c;都有可能会遇到开机需要按F1&#xff0c;才能进入系统的问题&#xff0c;引起这个问题的原因比较多&#xff0c;今天小编在这里给大家列举了比…

Linux系统命令无法使用(glib库相关问题)

1.背景描述 Yum强制安装了一些软件&#xff0c;安装软件成功无报错&#xff0c;完成后不久突然发现系统出问题了&#xff0c;所有的命令无法使用了&#xff0c;如ls、mv、cat等基本命令报错。 relocation error&#xff1a; /lib64/libpthread.so.0: symbol_libc_dl_error_tsd …

Jupyter Notebook自动保存失败等问题的解决

一、未生成配置文件 需要在命令行中&#xff0c;执行下面的命令自动生成配置文件 jupyter notebook --generate-config 执行后会在 C:\Users\用户名\.jupyter目录中生成文件 jupyter_notebook_config.py 二、在网页端打开Jupyter Notebook后文件保存失败&#xff1b;运行代码…

【含文档+PPT+源码】基于Python校园跑腿管理系统设计与实现

项目介绍 本课程演示的是一款基于Python校园跑腿管理系统设计与实现&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Python学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 3.…

TypeScript 中的联合类型:灵活的类型系统

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

WebStorm设置Vue Component模板

下载vue.js插件 下面有模板样例 Composition API&#xff1a;这是 Vue 3 的一项新特性&#xff0c;允许通过 setup 函数来组织组件逻辑。Options API&#xff1a;这是 Vue 2 和 Vue 3 都支持的传统方式&#xff0c;通过定义组件的 data、methods、computed 等来组织逻辑。 Comp…

解锁AI语音魅力——yoyo鹿鸣在线语音合成器,让创意声音即刻绽放!

yoyo鹿鸣-在线语音合成 人工智能语音克隆生成&#xff0c;二次元&#xff5e; AI工具 | AI探金 可以在AI探金社区来找我&#xff5e; yoyo鹿鸣 - 在线语音生成器 需求人群&#xff1a; 有语音合成需求的用户。 使用场景示例&#xff1a; 合成yoyo鹿鸣语音 等等 产品特色&a…

基于STM32的智能鱼缸水质净化系统设计

&#x1f91e;&#x1f91e;大家好&#xff0c;这里是5132单片机毕设设计项目分享&#xff0c;今天给大家分享的是智能鱼缸水质净化系统。 目录 1、设计要求 2、系统功能 3、演示视频和实物 4、系统设计框图 5、软件设计流程图 6、原理图 7、主程序 8、总结 1、设计要求…

t113-qt

修改QT配置: # # qmake configuration for building with arm-linux-gnueabi-g ## MAKEFILE_GENERATOR UNIX # CONFIG incremental # QMAKE_INCREMENTAL_STYLE sublib# include(../common/linux.conf) # include(../common/gcc-base-unix.conf) # inc…

apisix的real-ip插件使用说明

k8s集群入口一般都需要过负载均衡&#xff0c;然后再到apisix。 这时候如果后台业务需要获取客户端ip&#xff0c;可能拿到的是lb或者网关的内网ip。 这里一般要获取真实ip需要做几个处理。 1. 负载均衡上&#xff0c;一般支持配置获取真实ip参数&#xff0c;需要配置上。然…

[Meet DeepSeek] 如何顺畅使用DeepSeek?告别【服务器繁忙,请稍后再试。】

文章目录 [Meet DeepSeek] 如何顺畅使用DeepSeek&#xff1f;告别【服务器繁忙&#xff0c;请稍后再试。】引言使用渠道一&#xff1a;硅基流动 Chatbox AI【推荐】硅基流动 Chatbox AI的优势 使用渠道二&#xff1a;秘塔AI搜索秘塔AI搜索的优势 其它方案1. DeepSeek官网2. 纳…

四模型消融实验!DCS-CNN-BiLSTM-Attention系列四模型多变量时序预测

四模型消融实验&#xff01;DCS-CNN-BiLSTM-Attention系列四模型多变量时序预测 目录 四模型消融实验&#xff01;DCS-CNN-BiLSTM-Attention系列四模型多变量时序预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 基于DCS-CNN-BiLSTM-Attention、CNN-BiLSTM-Attention…

索引(MySQL)

1. 没有索引&#xff0c;可能会有什么问题 索引&#xff1a;提高数据库的性能&#xff0c;索引是物美价廉的东西了。不用加内存&#xff0c;不用改程序&#xff0c;不用调sql&#xff0c;只要执行 正确的 create index &#xff0c;查询速度就可能提高成百上千倍。但是天下没有…

Windows 实用设置工具 v3.6.5:一键优化系统设置

这款 Windows 实用设置工具 v3.6.5 是一款功能强大的系统优化软件&#xff0c;由 kernel 开发。它提供了丰富的系统设置选项&#xff0c;帮助用户轻松管理和优化 Windows 系统。以下是该工具的主要功能和特点&#xff1a; 主要功能 隐藏电脑文件夹 视频、文档、图片、音乐、下…

快速上手Vim的使用

Vim Linux编辑器-vim使用命令行模式下所有选项都可以带数字底行模式可视块模式&#xff08;ctrlV进入&#xff09; Linux编辑器-vim使用 Vim有多种模式的编辑器。能帮助我们很快的进行代码的编辑&#xff0c;甚至完成很多其他事情。 默认情况下我们打开vim在命令模式下&#x…

334递增的三元子序列贪心算法(思路解析+源码)

文章目录 题目思路解析源码总结题目 思路解析 有两种解法:解法一:动态规划(利用dp找到数组最长递增序列长度,判断是否大于3即可)本题不适用,因为时间复杂度为O(n^2),超时。 解法二:贪心算法:解法如上图,题目要求长度为三,设置第一个元素为长度1的值,是指长度二的…

sqli-labs靶场实录(二): Advanced Injections

sqli-labs靶场实录: Advanced Injections Less21Less22Less23探测注入点 Less24Less25联合注入使用符号替代 Less25aLess26逻辑符号绕过and/or过滤双写and/or绕过 Less26aLess27Less27aLess28Less28aLess29Less30Less31Less32&#xff08;宽字节注入&#xff09;Less33Less34Le…

Websocket从原理到实战

引言 WebSocket 是一种在单个 TCP 连接上进行全双工通信的网络协议&#xff0c;它使得客户端和服务器之间能够进行实时、双向的通信&#xff0c;既然是通信协议一定要从发展历史到协议内容到应用场景最后到实战全方位了解 发展历史 WebSocket 最初是为了解决 HTTP 协议在实时…