【智能大数据分析 | 实验三】Storm实验:实时WordCountTopology

在这里插入图片描述

【作者主页】Francek Chen
【专栏介绍】 ⌈ ⌈ 智能大数据分析 ⌋ ⌋ 智能大数据分析是指利用先进的技术和算法对大规模数据进行深入分析和挖掘,以提取有价值的信息和洞察。它结合了大数据技术、人工智能(AI)、机器学习(ML)和数据挖掘等多种方法,旨在通过自动化的方式分析复杂数据集,发现潜在的价值和关联性,实现数据的自动化处理和分析,从而支持决策和优化业务流程。与传统的人工分析相比,智能大数据分析具有自动化、深度挖掘、实时性和可视化等特点。智能大数据分析广泛应用于各个领域,包括金融服务、医疗健康、零售、市场营销等,帮助企业做出更为精准的决策,提升竞争力。
【GitCode】专栏资源保存在我的GitCode仓库:https://gitcode.com/Morse_Chen/Intelligent_bigdata_analysis。

文章目录

    • 一、实验目的
    • 二、实验要求
    • 三、实验原理
      • (一)Topologies
      • (二)Spouts
      • (三)Bolts
    • 四、实验环境
    • 五、实验内容和步骤
      • (一)启动 Storm 集群
      • (二)导入依赖 jar 包
      • (三)编写程序
      • (四)打包上传并运行
    • 六、实验结果
    • 七、实验心得


一、实验目的

掌握如何用 Java 代码来实现 Storm 任务的拓扑,掌握一个拓扑中 Spout 和 Bolt 的关系及如何组织它们之间的关系,掌握如何将 Storm 任务提交到集群。

二、实验要求

编写一个 Storm 拓扑,一个 Spout 每个一秒钟随机生成一个单词并发射给 Bolt,Bolt 统计接收到的每个单词出现的频率并每隔一秒钟实时打印一次统计结果,最后将任务提交到集群运行,并通过日志查看任务运行结果。

三、实验原理

Storm 集群和 Hadoop 集群表面上看很类似。但是 Hadoop 上运行的是 MapReduce jobs,而在 Storm 上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个 MapReduce job 最终会结束, 而一个 topology 永远会运行(除非你手动 kill 掉)。

(一)Topologies

一个 topology 是spoutsbolts组成的图,通过 stream groupings 将图中的 spouts 和 bolts 连接起来,如图所示。

在这里插入图片描述
一个 topology 会一直运行直到你手动 kill 掉,Storm 自动重新分配执行失败的任务, 并且 Storm 可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。

运行一个 topology 很简单。首先,把你所有的代码以及所依赖的 jar 打进一个 jar 包。然后运行类似下面的这个命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这个命令会运行主类:backtype.strom.MyTopology,参数是arg1arg2。这个类的 main 函数定义这个 topology 并且把它提交给 Nimbus。storm jar负责连接到 Nimbus 并且上传 jar 包。

Topology 的定义是一个 Thrift 结构,并且 Nimbus 就是一个 Thrift 服务, 你可以提交由任何语言创建的 topology。上面的方面是用 JVM-based 语言提交的最简单的方法。

(二)Spouts

消息源 spout 是 Storm 里面一个 topology 里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向 topology 里面发出消息:tuple。Spout 可以是可靠的也可以是不可靠的。如果这个 tuple 没有被 storm 成功处理,可靠的消息源 spouts 可以重新发射一个 tuple, 但是不可靠的消息源 spouts 一旦发出一个 tuple 就不能重发了。

消息源可以发射多条消息流 stream。使用OutputFieldsDeclarer.declareStream来定义多个 stream,然后使用SpoutOutputCollector来发射指定的 stream。

Spout类里面最重要的方法是nextTuple。要么发射一个新的 tuple 到 topology 里面或者简单的返回如果已经没有新的 tuple。要注意的是 nextTuple 方法不能阻塞,因为 storm 在同一个线程上面调用所有消息源 spout 的方法。

另外两个比较重要的 spout 方法是ackfail。storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack,否则调用 fail。storm 只对可靠的 spout 调用 ack 和 fail。

(三)Bolts

所有的消息处理逻辑被封装在 bolts 里面。Bolts 可以做很多事情:过滤,聚合,查询数据库等等。Bolts 可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多 bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

Bolts 可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义 stream,使用OutputCollector.emit来选择要发射的 stream。

Bolts 的主要方法是execute,它以一个 tuple 作为输入,bolts 使用OutputCollector来发射 tuple,bolts 必须要为它处理的每一个 tuple 调用OutputCollector的 ack 方法,以通知 Storm 这个 tuple 被处理完成了,从而通知这个 tuple 的发射者 spouts。 一般的流程是: bolts 处理一个输入 tuple,发射0个或者多个 tuple,然后调用 ack 通知 storm 自己已经处理过这个 tuple 了。storm 提供了一个 IBasicBolt 会自动调用 ack。

四、实验环境

  • 云创大数据实验平台:
    在这里插入图片描述

  • Java 版本:jdk1.7.0_79

  • Hadoop 版本:hadoop-2.7.1

  • ZooKeeper 版本:zookeeper-3.4.6

  • Storm 版本:storm-0.10.0

五、实验内容和步骤

本实验主要演示一个完整的 Storm 拓扑编码过程,主要包含 Spout、Bolt 和构建 Topology 几个步骤。

(一)启动 Storm 集群

首先,启动 Storm 集群。

实验的准备工作是:域名映射、免密登录、JDK 配置、部署 ZooKeeper、部署 Storm 等。该实验可以点击一键搭建后能看到搭建成功,即可自动搭建好环境。

(二)导入依赖 jar 包

其次,将 Storm 安装包的 lib 目录内如下 jar 包导入到开发工具:

在这里插入图片描述

然后再在 Eclipse 中对每个 jar 执行如下操作进行添加配置:

在这里插入图片描述

出现这样即可:

在这里插入图片描述

(三)编写程序

我们在项目的 src 中首先创建一个cproc.word包。

在这里插入图片描述

然后,编写代码,实现一个完整的 Topology,内容如下:

Spout 随机发送单词,代码实现:

package cproc.word;import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;public class WordReaderSpout extends BaseRichSpout {private SpoutOutputCollector collector;@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector){this.collector = collector;}@Overridepublic void nextTuple() {//这个方法会不断被调用,为了降低它对CPU的消耗,让它sleep一下Utils.sleep(1000);final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};Random rand = new Random();String word = words[rand.nextInt(words.length)];collector.emit(new Values(word));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}
}

Bolt 单词计数,并每隔一秒打印一次,代码实现:

package cproc.word;import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;public class WordCounterBolt extends BaseBasicBolt {private static final long serialVersionUID = 5683648523524179434L;private HashMap<String, Integer> counters = new HashMap<String, Integer>();private volatile boolean edit = false;@Overridepublic void prepare(Map stormConf, TopologyContext context) {//定义一个线程1秒钟打印一次统计的信息new Thread(new Runnable() {public void run() {while (true) {if (edit) {for (Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey() + " : " + entry.getValue());}edit = false;}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}}).start();}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);if (!counters.containsKey(str)) {counters.put(str, 1);} else {Integer c = counters.get(str) + 1;counters.put(str, c);}edit = true;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

构建 Topology 并提交到集群主函数,代码实现:

package cproc.word;import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;public class WordCountTopo {public static void main(String[] args) throws Exception{//构建TopologyTopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader", new WordReaderSpout());builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-reader");Config conf = new Config();//集群方式提交StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,builder.createTopology());}
}

(四)打包上传并运行

将 Storm 代码打成wordCount-Storm.jar (打包的时候不要包含 storm 中的 jar,不然会报错的,将无法运行,即:wordCount-Storm.jar中只包含上面三个类的代码) 上传到主节点的/usr/cstor/storm/bin目录下。

在这里插入图片描述

这里需要注意的是我们不勾选上图框选的选项,这样就不会打包项目中的jar包。

在这里插入图片描述

在主节点进入 Storm 安装目录的 bin 下面用以下命令提交任务:

cd /usr/cstor/storm/bin
./storm jar wordCount-Storm.jar cproc.word.WordCountTopo wordCount

在这里插入图片描述

因为 topology 会永远运行,需要手动 kill 掉,使用以下命令结束 storm 任务:

./storm kill wordCount

在这里插入图片描述

六、实验结果

Storm 任务执行时,可以查看 Storm 日志文件,日志里面打印了统计的单词结果,日志内容如图。注意要到 slave1 服务器上查看日志文件。

cd /usr/cstor/storm/logs
ls
cat wordCount-1-1728785733-worker-6703.log

在这里插入图片描述
……

在这里插入图片描述

七、实验心得

  在本次 Storm 实验中,我深入了解了如何使用 Apache Storm 实现一个实时 WordCountTopology。Apache Storm 是一个开源的分布式实时计算系统,用于处理大量的数据流。通过本次实验,我不仅掌握了 Storm 的基本概念,还学会了如何使用 Java 代码来实现 Storm 任务的拓扑,以及如何将 Storm 任务提交到集群中运行。

  实验的核心是创建一个能够实时统计单词频率的 Topology。这个 Topology 由一个 Spout 和多个 Bolt 组成。Spout 负责生成或接收外部数据流,并将其转换为 Storm 内部的 Tuple(消息传递的基本单元)。在这个实验中,Spout 每秒随机生成一个单词,并将其发送给 Bolt。Bolt 则负责处理接收到的 Tuple,进行单词统计,并每隔一秒打印一次统计结果。

  在实验过程中,我首先通过 Eclipse 创建了一个 StormTest 项目,并导入了所需的依赖 jar 包。然后,我创建了三个 Java 类:WordReaderSpoutWordCounterBoltWordCountTopo。WordReaderSpout 负责生成单词,WordCounterBolt 负责将单词拆分和统计单词频率。最后,在 WordCountTopo 类中定义了 Topology 的结构,并将这些组件组织起来。

  在将 Topology 提交到 Storm 集群之前,我首先在本地模式下进行了测试。通过运行storm jar命令,我成功地将 Topology 提交给 Nimbus(Storm 的主节点),并在本地机器上模拟了 Storm 集群的运行环境。测试结果显示,Topology 能够正确地生成单词、拆分单词并统计单词频率。

  接下来,我将 Topology 提交到了实际的 Storm 集群中运行。在集群模式下,我需要注意一些额外的配置,如设置 worker 的数量、executor 的数量以及 task 的数量等。这些配置对于优化 Topology 的性能至关重要。通过合理地配置并行度,我成功地提高了 Topology 的处理效率。

  总的来说,这次 Storm 实验让我对分布式实时计算系统有了更深入的了解。通过实践,我不仅掌握了 Storm 的基本概念和操作方法,还学会了如何优化 Topology 的性能和解决实际问题。我相信这些经验和知识将对我未来的学习和工作产生积极的影响。

:以上文中的数据文件及相关资源下载地址:
链接:https://pan.quark.cn/s/d94bcadb79c8
提取码:U84E

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/446651.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手机、固话号码想要认证,需要显示企业名称该怎么设置?

在现如今激烈竞争的商业环境中&#xff0c;依然有越来越多的企业意识到品牌的力量与价值&#xff0c;作为吸引客户关注、打造客户第一印象的关键环节。如何让企业外呼号码展示品牌与企业名称就变得格外关键。 那么手机、固话号码申请号码品牌认证究竟是什么&#xff1f;申请的…

使用CSS Flexbox创建简洁时间轴

使用CSS Flexbox创建简洁时间轴 在网页设计中,时间轴是一种常见且有效的方式来展示事件的顺序和进程。本文将介绍如何使用CSS Flexbox创建一个简洁优雅的时间轴,无需复杂的JavaScript代码。 基本HTML结构 首先,我们需要创建基本的HTML结构: html复制<div class"ti…

IT招聘乱象的全面分析

近年来&#xff0c;IT行业的招聘要求似乎越来越苛刻&#xff0c;甚至有些不切实际。许多企业在招聘时&#xff0c;不仅要求前端工程师具备UI设计能力&#xff0c;还希望后端工程师精通K8S服务器运维&#xff0c;更有甚至希望研发经理掌握所有前后端框架和最新开发技术。这种招聘…

AI大模型是怎么运作的?深入解析

在当今这个日新月异的科技时代&#xff0c;人工智能&#xff08;AI&#xff09;如同一位隐形的助手&#xff0c;悄然渗透进我们生活的方方面面&#xff0c;其影响力日益显著。这位“隐形助手”背后的工作原理究竟是怎样的呢&#xff1f;接下来&#xff0c;本文将从AI的基本原理…

随机多智能体系统中的自然策略能力

本文探讨了在随机多智能体系统中采用自然策略进行PATL及PATL逻辑的模型检验问题。研究发现&#xff0c;当活跃联盟被限于确定性策略时&#xff0c;NatPATL的模型检验问题是NP完全的&#xff1b;在同样的限制条件下&#xff0c;NatPATL的复杂度则为2NEXPTIME。若不限制策略类型&…

2024全面大模型学习指南

前言 随着人工智能技术的迅猛发展&#xff0c;大模型&#xff08;Large Models&#xff09;已成为这一领域的新宠。从GPT系列到BERT&#xff0c;再到各类变体&#xff0c;大模型以其强大的能力吸引了无数开发者和研究者的目光。那么&#xff0c;作为一个零基础的学习者&#x…

2024 年 04 月编程语言排行榜,PHP 排名创新低?

编程语言的流行度总是变化莫测&#xff0c;每个月的排行榜都揭示着新的趋势。2024年4月的编程语言排行榜揭示了一个引人关注的现象&#xff1a;PHP的排名再次下滑&#xff0c;创下了历史新低。这种变化对于PHP开发者和整个技术社区来说&#xff0c;意味着什么呢&#xff1f; P…

ChatGPT国内中文版镜像网站整理合集(2024/10/06)

一、GPT中文镜像站 ① yixiaai.com 支持GPT4、4o以及o1&#xff0c;支持MJ绘画 ② chat.lify.vip 支持通用全模型&#xff0c;支持文件读取、插件、绘画、AIPPT ③ AI Chat 支持GPT3.5/4&#xff0c;4o以及MJ绘画 1. 什么是镜像站 镜像站&#xff08;Mirror Site&#xff…

LLMs训练避坑帖——如何高效 LLMs pretrain?

LLM训练-pretrain 这篇文章介绍下如何从零到一进行 pretrain 工作。 类似的文章应该有很多&#xff0c;不同的地方可能在于&#xff0c;我并不会去分析 pretrain 阶段的核心技术&#xff0c;而是用比较朴素的语言来描述这个大工程的每一块砖瓦。我的介绍偏方法论一些&#xf…

服务器信息安全可视化:增强风险监测

通过图扑可视化技术&#xff0c;将服务器的安全状态以图形化方式展示&#xff0c;帮助安全团队实时监控潜在威胁&#xff0c;提高快速响应能力&#xff0c;保障数据和系统的安全性与稳定性。

【MATLAB源码-第248期】基于matlab的EMD算法+ICA算法轴承故障分析。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 经验模态分解&#xff08;EMD&#xff09;与轴承故障识别 EMD的基本原理 EMD 是一种自适应的信号分解技术&#xff0c;最初由 Huang 等人在 1998 年提出&#xff0c;旨在分析非线性和非平稳信号。传统的信号处理方法通常假设…

绘制YOLOv11模型在训练过程中,精准率,召回率,mAP_0.5,mAP_0.5:0.95,以及各种损失的变化曲线

一、本文介绍 本文用于绘制模型在训练过程中,精准率,召回率,mAP_0.5,mAP_0.5:0.95,以及各种损失的变化曲线。用以比较不同算法的收敛速度,最终精度等,并且能够在论文中直观的展示改进效果。支持多文件的数据比较。 专栏目录:YOLOv11改进目录一览 | 涉及卷积层、轻量化…

E41.【C语言】练习:斐波那契函数的空间复杂度的计算及函数调用分析

1.题目 求下列代码的时间复杂度 long long f(size_t n) {if(n < 3)return 1;return f(n-1) f(n-2); } 2.解 显然是递归算法(递归讲解见35.【C语言】详解函数递归),可以画个二叉树分析 Fib嵌套函数调用细则的分析 进入f(n),返回f(n-1)f(n-2),注意:一次只能调用一个函数…

漫途以产品为导向,为集成商客户提供稳定、可靠的物联网终端设备!

无锡漫途科技有限公司成立于2014年8月至今已经十年有余&#xff0c;在这期间公司始终把“1344”战略作为核心指导方向。 “1”代表只做一件事&#xff0c;即以“物联网基础大数据服务商”为主要业务定位&#xff0c;围绕中国制造 2025&#xff0c;推动中国工业智能化转型升级&…

电脑录屏怎么录视频和声音?新手必看的屏幕录制技巧

我们在生活或工作中&#xff0c;经常需要用到屏幕录制&#xff0c;如用来制作教学视频、分享游戏直播、记录软件操作教程等&#xff0c;但对于新手来说&#xff0c;如何在电脑上录制既清晰又带有声音的视频&#xff0c;可能还是一个挑战。今天&#xff0c;我们就来分享三个实用…

华为OD机试 - 小明找位置 - 二分查找(Python/JS/C/C++ 2024 E卷 100分)

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试真题&#xff08;Python/JS/C/C&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;私信哪吒&#xff0c;备注华为OD&#xff0c;加入华为OD刷题交流群&#xff0c;…

一句话就把HTTPS工作原理讲明白了

号主&#xff1a;老杨丨11年资深网络工程师&#xff0c;更多网工提升干货&#xff0c;请关注公众号&#xff1a;网络工程师俱乐部 上午好&#xff0c;我的网工朋友。 在当今互联网高度发达的时代&#xff0c;信息安全已成为不容忽视的重要议题。 随着越来越多的个人信息和敏感…

朗伯特反射模型

免责声明&#xff1a;本文所提供的信息和内容仅供参考。作者对本文内容的准确性、完整性、及时性或适用性不作任何明示或暗示的保证。在任何情况下&#xff0c;作者不对因使用本文内容而导致的任何直接或间接损失承担责任&#xff0c;包括但不限于数据丢失、业务中断或其他经济…

如何快速入门VCU应用层软件开发?(34篇实例讲解+软件开发测试方法+工具使用)

最近&#xff0c;用一个多月的时间总结了VCU应用层软件开发的基本流程&#xff0c;架构&#xff0c;关键模块的控制策略及Simulink建模方法、测试方法及相关工具的使用。如何快速入门VCU应用软件开发层软件开发&#xff0c;通过本篇文章可以给你答案。文章标题为超链接&#xf…

【MATLAB代码,带TDOA数据导入】TDOA三维空间的位置(1主锚点、3副锚点),多个时间点、输出位置的坐标

作品简介 【MATLAB代码&#xff0c;带TDOA数据导入】TDOA求三维下的位置&#xff0c;通过四个锚节点&#xff08;1主锚点、3副锚点)的信号传播时间差定位。 一次性求解多个时间点的位置&#xff0c;输出位置图像和点的坐标。 产品特点 精准定位&#xff1a;有效消除测距误差…