大数据-Storm流式框架(五)---DRPC

DRPC

概念

分布式RPC(DRPC)背后的想法是使用Storm在运行中并行计算真正强大的函数。 Storm拓扑接收函数参数流作为输入,并为每个函数调用发送结果的输出流。

DRPC并不是Storm的一个特征,因为它基于Storm的spouts,bolts和拓扑的高级抽象。DRPC本可以打包成Storm独立的库,但是跟storm绑定在一起很有用。

顶层视角

分布式RPC由“DRPC服务器”协调(Storm随附实现)。 DRPC服务器协调接收RPC请求,将请求发送到Storm拓扑,从Storm拓扑接收结果,并将结果发送回等待的客户端。 从客户端的角度来看,分布式RPC调用看起来就像常规的RPC调用。 例如,以下是客户端如何使用参数“http://twitter.com”计算“到达”函数的结果:

DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");

分布式RPC工作流程:

客户端向DRPC服务器发送要执行的函数名称以及该函数的参数。实现该功能的拓扑使用DRPCSpout从DRPC服务器接收函数调用流。 每个函数调用都由DRPC服务器标记唯一ID。 然后拓扑计算结果,在拓扑结束时,一个名为ReturnResults的bolt连接到DRPC服务器,并为其提供函数调用id的结果。 然后,DRPC服务器使用id来匹配客户端正在等待的结果,取消阻塞等待的客户端,并将结果发送给它。

LinearDRPCTopologyBuilder

Storm附带了一个名为LinearDRPCTopologyBuilder的拓扑构建器,它可以自动执行几乎所有涉及DRPC的步骤。 这些包括:

     1、设置spout

     2、将结果返回给DRPC服务器

     3、为bolt提供功能,以便在tuple(元组)组上进行有限聚合

我们来看一个简单的例子。 这是DRPC拓扑的实现,它返回带有“!”的输入参数。附:

public static class ExclaimBolt extends BaseBasicBolt {public void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}
}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);// ...
}

正如你所看到的,没有几行代码。 创建LinearDRPCTopologyBuilder时,可以告诉它拓扑的DRPC函数的名称。 单个DRPC服务器可以协调许多功能,函数名称可以区分各个函数。 声明的第一个bolt将2元组作为输入,其中第一个字段是请求ID,第二个字段是该请求的参数。 LinearDRPCTopologyBuilder期望最后一个bolt发出一个输出流,其中包含[id,result]形式的2元组。 最后,所有中间元组都必须包含请求ID作为第一个字段。

在这个例子中,ExclaimBolt只是附加一个“!” 到元组的第二个字段。 LinearDRPCTopologyBuilder处理连接到DRPC服务器并返回结果的其余协调。

本地模式的DRPC

DRPC可以在本地模式运行。下面的例子说明了如何运行本地模式的DRPC:

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));cluster.shutdown();
drpc.shutdown();

首先,创建一个LocalDRPC对象。 此对象模拟正在进行的DRPC服务器,就像LocalCluster在进程中模拟Storm集群一样。 然后创建LocalCluster以在本地模式下运行拓扑。 LinearDRPCTopologyBuilder具有用于创建本地拓扑和远程拓扑的单独方法。 在本地模式下,LocalDRPC对象不会绑定到任何端口,因此拓扑需要知道要与之通信的对象。 这就是createLocalTopology将LocalDRPC对象作为输入接收的原因。

启动拓扑后,您可以使用LocalDRPC上的execute方法执行DRPC调用。

远程模式的DRPC

在实际集群上使用DRPC也很简单。 有三个步骤:

     1、启动DRPC服务器

     2、配置DRPC服务器的位置

     3、将DRPC拓扑提交给Storm集群

启动DRPC服务器可以使用storm脚本完成,就像启动Nimbus或UI一样:

bin/storm drpc

接下来,您需要配置Storm群集以了解DRPC服务器的位置。 这就是DRPCSpout如何知道从何处读取函数调用。 这可以通过storm.yaml文件或拓扑配置来完成。 通过storm.yaml配置这个看起来像这样:

drpc.servers:- "drpc1.foo.com"- "drpc2.foo.com"

最后,像启动任何一个其他的拓扑一样,使用StormSubmitter启动DRPC拓扑。要在远程模式运行上述的示例,操作如下:

StormSubmitter
.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

createRemoteTopology用于为storm集群创建合适的拓扑。

稍微复杂的示例

感叹号DRPC示例是用于说明DRPC概念的玩具示例。让我们看一个更复杂的例子,它真正需要Storm集群为计算DRPC函数提供的并行性。我们将看到的示例是在Twitter上计算URL的范围。

URL的范围是在Twitter上暴露给URL的唯一人数。要计算覆盖面,您需要:

    1、获取推文网址的所有人

    2、获得所有这些人的所有粉丝

    3、独特的追随者

    4、统计一组独特的粉丝

在计算过程中,单个到达计算可能涉及数千个数据库调用和数千万个跟随者记录。这是一个非常非常密集的计算。正如您将要看到的那样,在Storm之上实现此功能非常简单。在一台计算机上,达到计算可能需要几分钟;在Storm集群中,您可以在几秒钟内计算最难的URL的覆盖率。

此处的storm-starter中定义了样本范围拓扑。以下是定义范围拓扑的方法:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2).fieldsGrouping(new Fields("id"));

拓扑执行为四个步骤:

    1、GetTweeters获取推文URL的用户。它将[id,url]的输入流转换为[id,tweeter]的输出流。每个url元组将映射到许多tweeter元组。

    2、GetFollowers获得推特的追随者。它将[id,tweeter]的输入流转换为[id,follower]的输出流。在所有任务中,当有人跟随多个发布相同URL的人时,可能会有重复的跟随元组。

    3、PartialUniquer通过关注者ID对关注者流进行分组。这具有相同的跟随者执行相同任务的效果。因此,PartialUniquer的每项任务都将获得相互独立的追随者。一旦PartialUniquer收到针对请求ID的所有针对它的关注元组,它就会发出其关注者子集的唯一计数。

4、最后,CountAggregator接收来自每个PartialUniquer任务的部分计数,并将它们相加以完成到达计算。

PartialUniquer代码:

public class PartialUniquer extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;Set<String> _followers = new HashSet<String>();@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_followers.add(tuple.getString(1));}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _followers.size()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "partial-count"));}
}

PartialUniquer通过扩展BaseBatchBolt实现IBatchBolt。批处理bolt提供了一个第一类API来处理一批元组作为具体单元。为每个请求ID创建一个新的批处理bolt实例,Storm会在适当的时候负责清理实例。

当PartialUniquer在execute方法中收到一个跟随元组时,它会将它添加到内部HashSet中的请求ID的集合中。

批处理bolt提供finishBatch方法,该方法在处理了针对此任务的此批处理的所有元组之后调用。在回调中,PartialUniquer会发出一个元组,其中包含其跟随者id子集的唯一计数。

在底层,CoordinatedBolt用于检测给定的bolt何时收到任何给定请求ID的所有元组。 CoordinatedBolt利用直接流来管理这种协调。

拓扑的其余部分应该是不言自明的。如您所见,到达计算的每一步都是并行完成的,定义DRPC拓扑非常简单。

非线性DRPC拓扑

LinearDRPCTopologyBuilder仅处理“线性”DRPC拓扑,其中计算表示为一系列步骤(如覆盖范围)。 不难想象函数需要更复杂的拓扑结构,包括bolt的分支和合并。 现在,要做到这一点,你需要直接使用CoordinatedBolt。 请务必在邮件列表中讨论非线性DRPC拓扑的用例,以便为DRPC拓扑构建更一般的抽象。

LinearDRPCTopologyBuilder工作流程:

DRPCSpout发射[args, return-info]。return-info是DRPC服务器的主机名和端口号,以及DRPC服务器生成的id。

创建一个拓扑包括:

  1. DRPCSpout
  2. PrepareRequest(生成请求ID,为返回信息创建一个流,为参数创建一个流)
  3. CoordinatedBolt
  4. JoinResult(使用return info合并结果)
  5. ReturnResult(连接DRPC服务器以及返回结果)

LinearDRPCTopologyBuilder是在storm原语之上构建高级别抽象的一个很好的例子。

进阶

KeyedFairBolt用于编织多个同时请求的处理

如何直接使用CoordinatedBolt

DRPC (Distributed RPC)  remote procedure call

分布式远程过程调用

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。

DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。

(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

DRPC设计目的:

为了充分利用Storm的计算能力实现高密度的并行实时计算。

(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

定义DRPC拓扑

方法1:

通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)

该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现

方法2:

直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑

需要手动设定好开始的DRPCSpout以及结束的ReturnResults

运行模式:

1、本地模式

2、远程模式(集群模式)

修改配置文件conf/storm.yaml

drpc.servers:

    - "node1“

启动DRPC Server

bin/storm drpc &

通过StormSubmitter.submitTopology提交拓扑

案例:

Twitter 中某个URL的受众人数统计(这篇twitter到底有多少人看到过)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/171868.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【OpenCV实现平滑图像形态学变化】

文章目录 概要目标腐蚀膨胀开运算结构元素&#xff08;内核&#xff09;小结 概要 形态学变化是一组简单的图像操作&#xff0c;主要用于处理二值图像&#xff0c;即只包含黑和白两种颜色的图像。这些操作通常需要两个输入&#xff0c;原始图像和一个内核&#xff08;kernel&a…

JVM进阶(3)

一)什么是垃圾&#xff1f; 垃圾指的是在应用程序中没有任何指针指向的对象&#xff0c;这个对象就是需要被回收的垃圾&#xff0c;如果不及时的针对内存中的垃圾进行清理&#xff0c;那么这些垃圾对象所占用的内存空间可能一直保留到应用程序结束&#xff0c;被保留的空间无法…

一文详解汽车电CAN总线

1.什么是CAN总线 CAN总线(控制器区域网络)是一个中央网络系统&#xff0c;连接不同的电子控制单元(ECU)&#xff0c;车辆中的其他设备。现代汽车可以有100个ECU&#xff0c;因此CAN总线通信变得非常重要。 2.CAN总线流行的背景 集中式:CAN总线系统允许对连接到网络的ECU进行集…

前端移动web高级详细解析一

01-平面转换 简介 作用&#xff1a;为元素添加动态效果&#xff0c;一般与过渡配合使用 概念&#xff1a;改变盒子在平面内的形态&#xff08;位移、旋转、缩放、倾斜&#xff09; 平面转换也叫 2D 转换&#xff0c;属性是 transform 平移 transform: translate(X轴移动距…

Android开发知识学习——编码、加密、Hash、序列化和字符集

文章目录 学习资源来自&#xff1a;扔物线加密古代密码学现代密码学对称加密非对称加密密码学密钥和登录密码Base64URL 使用的百分号编码压缩与解压缩图片与音频、视频编解码 序列化Hash字符集课后题 学习资源来自&#xff1a;扔物线 加密 古代密码学 起源&#xff1a;古代战…

C/C++面试常见问题——const关键字的作用和用法

首先我们需要一下const关键字的定义&#xff0c;const名叫常量限定符&#xff0c;当const修饰变量时&#xff0c;就是在告诉编译器该变量只可访问不可修改&#xff0c;而编译器对于被const修饰的变量有一个优化&#xff0c;编译器不会专门为其开辟空间&#xff0c;而是将变量名…

Win10中Pro/E鼠标滚轮不能缩放该怎么办?

Pro/E安装好后&#xff0c;鼠标滚轮不能缩放模型&#xff0c;该怎么办&#xff1f;问题多发生在win8/win10上&#xff0c;新装了PROE&#xff0c;发现滑动鼠标中键不能放大缩小。 彩虹图纸管理软件_图纸管理系统_图纸文档管理软件系统_彩虹EDM【官网】彩虹EDM图纸管理软件系统…

Windows下安装Anaconda、Pycharm以及iflycode插件图解

目录 一、下载Anaconda、Pycharm以及iflycode插件 二、创建相关文件夹 三、Pycharm社区版安装详细步骤 四、Anaconda安装详细步骤 五、配置Pycharm 六、安装iflycode插件 Anaconda是一款集成的Python环境&#xff0c;anaconda可以看做Python的一个集成安装&#xff0c;安…

WebGL笔记:矩阵的变换之平移的实现

矩阵的变换 变换 变换有三种状态&#xff1a;平移、旋转、缩放。当我们变换一个图形时&#xff0c;实际上就是在移动这个图形的所有顶点。解释 webgl 要绘图的话&#xff0c;它是先定顶点的&#xff0c;就比如说我要画个三角形&#xff0c;那它会先把这三角形的三个顶点定出来…

云端代码编辑器Atheos

什么是 Atheos &#xff1f; Atheos是一个基于 Web 的 IDE 框架&#xff0c;占用空间小且要求最低&#xff0c;构建于 Codiad 之上&#xff0c;不过 Atheos 已从原始 Codiad 项目完全重写&#xff0c;以利用更现代的工具、更简洁的代码和更广泛的功能。 注意事项 群晖内核版本太…

【计算机毕设小程序案例】基于微信小程序的图书馆座位预定系统

前言&#xff1a;我是IT源码社&#xff0c;从事计算机开发行业数年&#xff0c;专注Java领域&#xff0c;专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 &#x1f449;IT源码社-SpringBoot优质案例推荐&#x1f448; &#x1f449;IT源码社-小程序优质案例…

理解V3中的proxy和reflect

现有如下面试题 结合GeexCode和Gpt // 这个函数名为onWatch&#xff0c;接受三个参数obj、setBind和getlogger。 // obj是需要进行监视的对象。 // setBind是一个回调函数&#xff0c;用于在设置属性时进行绑定操作。 // getlogger是一个回调函数&#xff0c;用于在获取属性时…

项目管理工具ConceptDraw PROJECT mac中文版自定义列功能

ConceptDraw PROJECT Mac是一款专业的项目管理工具&#xff0c;适用于MacOS平台。它提供了成功规划和执行项目所需的完整功能&#xff0c;包括任务和资源管理、报告和变更控制。 这款软件可以与ConceptDraw office集成&#xff0c;利用思维导图和数据可视化的强大功能来改进项目…

【Linux】操作系统以及虚拟机的安装与配置

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于Linux的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.操作系统的介绍 二.VMWare虚拟机的安装…

安装虚拟机(VMware)保姆级教程及配置虚拟网络编辑器和安装WindowsServer以及宿主机访问虚拟机和配置服务器环境

目录 一、操作系统 1.1.什么是操作系统 1.2.常见操作系统 1.3.个人版本和服务器版本的区别 1.4.Linux的各个版本 二、VMware Wworkstation Pro虚拟机的安装 1.下载与安装 注意&#xff1a;VMWare虚拟网卡 2.配置虚拟网络编辑器 三、安装配置 WindowsServer 1.创建虚拟…

底层全部重构,小米澎湃OS完整系统架构公布

上周&#xff0c;雷军发文称小米全新操作系统澎湃 OS 正式版已完成封包&#xff0c;将逐步接替 MIUI。而后&#xff0c;又有网友曝光小米澎湃 OS 界面。 今日&#xff0c;雷军再度发表长文预热小米澎湃 OS&#xff0c;正式公布了完整系统架构。 据介绍&#xff0c;从架构设计之…

【咕咕送书 | 第四期】《ChatGPT 驱动软件开发:AI 在软件研发全流程中的革新与实践》

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《粉丝福利》 《C语言进阶篇》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 ⛳️ 写在前面参与规则一、前言1.0 人工智能新技术如何创新工作 &#xff1f; 二、内容简介三、作者简介四、专家推…

请解释一下React中的条件渲染(conditional rendering)。

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

Linux 系统调用IO口,利用光标偏移实现文件复制

用系统调用IO函数实现从一个文件读取最后2KB数据并复制到另一个文件中&#xff0c;源文件以只读方式打开&#xff0c;目标文件以只写的方式打开&#xff0c;若目标文件不存在&#xff0c;可以创建并设置初始值为0664&#xff0c;写出相应代码&#xff0c;要对出错情况有一定的处…

【C++项目】高并发内存池第六讲 当申请内存大于256K时的处理

目录 1.申请过程2.释放过程 1.申请过程 当申请的内存大于256kb时直接向堆中申请&#xff1a; static void* ConcurrentAlloc(size_t size) {if (size > MAX_BYTES){size_t alignSize SizeClass::RoundUp(size);size_t kpage alignSize >> PAGE_SHIFT;PageCache::…