DRPC
概念
分布式RPC(DRPC)背后的想法是使用Storm在运行中并行计算真正强大的函数。 Storm拓扑接收函数参数流作为输入,并为每个函数调用发送结果的输出流。
DRPC并不是Storm的一个特征,因为它基于Storm的spouts,bolts和拓扑的高级抽象。DRPC本可以打包成Storm独立的库,但是跟storm绑定在一起很有用。
顶层视角
分布式RPC由“DRPC服务器”协调(Storm随附实现)。 DRPC服务器协调接收RPC请求,将请求发送到Storm拓扑,从Storm拓扑接收结果,并将结果发送回等待的客户端。 从客户端的角度来看,分布式RPC调用看起来就像常规的RPC调用。 例如,以下是客户端如何使用参数“http://twitter.com”计算“到达”函数的结果:
DRPCClient client = new DRPCClient("drpc-host", 3772);
String result = client.execute("reach", "http://twitter.com");
分布式RPC工作流程:
客户端向DRPC服务器发送要执行的函数名称以及该函数的参数。实现该功能的拓扑使用DRPCSpout从DRPC服务器接收函数调用流。 每个函数调用都由DRPC服务器标记唯一ID。 然后拓扑计算结果,在拓扑结束时,一个名为ReturnResults的bolt连接到DRPC服务器,并为其提供函数调用id的结果。 然后,DRPC服务器使用id来匹配客户端正在等待的结果,取消阻塞等待的客户端,并将结果发送给它。
LinearDRPCTopologyBuilder
Storm附带了一个名为LinearDRPCTopologyBuilder的拓扑构建器,它可以自动执行几乎所有涉及DRPC的步骤。 这些包括:
1、设置spout
2、将结果返回给DRPC服务器
3、为bolt提供功能,以便在tuple(元组)组上进行有限聚合
我们来看一个简单的例子。 这是DRPC拓扑的实现,它返回带有“!”的输入参数。附:
public static class ExclaimBolt extends BaseBasicBolt {public void execute(Tuple tuple, BasicOutputCollector collector) {String input = tuple.getString(1);collector.emit(new Values(tuple.getValue(0), input + "!"));}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "result"));}
}public static void main(String[] args) throws Exception {LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");builder.addBolt(new ExclaimBolt(), 3);// ...
}
正如你所看到的,没有几行代码。 创建LinearDRPCTopologyBuilder时,可以告诉它拓扑的DRPC函数的名称。 单个DRPC服务器可以协调许多功能,函数名称可以区分各个函数。 声明的第一个bolt将2元组作为输入,其中第一个字段是请求ID,第二个字段是该请求的参数。 LinearDRPCTopologyBuilder期望最后一个bolt发出一个输出流,其中包含[id,result]形式的2元组。 最后,所有中间元组都必须包含请求ID作为第一个字段。
在这个例子中,ExclaimBolt只是附加一个“!” 到元组的第二个字段。 LinearDRPCTopologyBuilder处理连接到DRPC服务器并返回结果的其余协调。
本地模式的DRPC
DRPC可以在本地模式运行。下面的例子说明了如何运行本地模式的DRPC:
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));cluster.shutdown();
drpc.shutdown();
首先,创建一个LocalDRPC对象。 此对象模拟正在进行的DRPC服务器,就像LocalCluster在进程中模拟Storm集群一样。 然后创建LocalCluster以在本地模式下运行拓扑。 LinearDRPCTopologyBuilder具有用于创建本地拓扑和远程拓扑的单独方法。 在本地模式下,LocalDRPC对象不会绑定到任何端口,因此拓扑需要知道要与之通信的对象。 这就是createLocalTopology将LocalDRPC对象作为输入接收的原因。
启动拓扑后,您可以使用LocalDRPC上的execute方法执行DRPC调用。
远程模式的DRPC
在实际集群上使用DRPC也很简单。 有三个步骤:
1、启动DRPC服务器
2、配置DRPC服务器的位置
3、将DRPC拓扑提交给Storm集群
启动DRPC服务器可以使用storm脚本完成,就像启动Nimbus或UI一样:
bin/storm drpc
接下来,您需要配置Storm群集以了解DRPC服务器的位置。 这就是DRPCSpout如何知道从何处读取函数调用。 这可以通过storm.yaml文件或拓扑配置来完成。 通过storm.yaml配置这个看起来像这样:
drpc.servers:- "drpc1.foo.com"- "drpc2.foo.com"
最后,像启动任何一个其他的拓扑一样,使用StormSubmitter启动DRPC拓扑。要在远程模式运行上述的示例,操作如下:
StormSubmitter
.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology用于为storm集群创建合适的拓扑。
稍微复杂的示例
感叹号DRPC示例是用于说明DRPC概念的玩具示例。让我们看一个更复杂的例子,它真正需要Storm集群为计算DRPC函数提供的并行性。我们将看到的示例是在Twitter上计算URL的范围。
URL的范围是在Twitter上暴露给URL的唯一人数。要计算覆盖面,您需要:
1、获取推文网址的所有人
2、获得所有这些人的所有粉丝
3、独特的追随者
4、统计一组独特的粉丝
在计算过程中,单个到达计算可能涉及数千个数据库调用和数千万个跟随者记录。这是一个非常非常密集的计算。正如您将要看到的那样,在Storm之上实现此功能非常简单。在一台计算机上,达到计算可能需要几分钟;在Storm集群中,您可以在几秒钟内计算最难的URL的覆盖率。
此处的storm-starter中定义了样本范围拓扑。以下是定义范围拓扑的方法:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
builder.addBolt(new GetTweeters(), 3);
builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
builder.addBolt(new CountAggregator(), 2).fieldsGrouping(new Fields("id"));
拓扑执行为四个步骤:
1、GetTweeters获取推文URL的用户。它将[id,url]的输入流转换为[id,tweeter]的输出流。每个url元组将映射到许多tweeter元组。
2、GetFollowers获得推特的追随者。它将[id,tweeter]的输入流转换为[id,follower]的输出流。在所有任务中,当有人跟随多个发布相同URL的人时,可能会有重复的跟随元组。
3、PartialUniquer通过关注者ID对关注者流进行分组。这具有相同的跟随者执行相同任务的效果。因此,PartialUniquer的每项任务都将获得相互独立的追随者。一旦PartialUniquer收到针对请求ID的所有针对它的关注元组,它就会发出其关注者子集的唯一计数。
4、最后,CountAggregator接收来自每个PartialUniquer任务的部分计数,并将它们相加以完成到达计算。
PartialUniquer代码:
public class PartialUniquer extends BaseBatchBolt {BatchOutputCollector _collector;Object _id;Set<String> _followers = new HashSet<String>();@Overridepublic void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_collector = collector;_id = id;}@Overridepublic void execute(Tuple tuple) {_followers.add(tuple.getString(1));}@Overridepublic void finishBatch() {_collector.emit(new Values(_id, _followers.size()));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("id", "partial-count"));}
}
PartialUniquer通过扩展BaseBatchBolt实现IBatchBolt。批处理bolt提供了一个第一类API来处理一批元组作为具体单元。为每个请求ID创建一个新的批处理bolt实例,Storm会在适当的时候负责清理实例。
当PartialUniquer在execute方法中收到一个跟随元组时,它会将它添加到内部HashSet中的请求ID的集合中。
批处理bolt提供finishBatch方法,该方法在处理了针对此任务的此批处理的所有元组之后调用。在回调中,PartialUniquer会发出一个元组,其中包含其跟随者id子集的唯一计数。
在底层,CoordinatedBolt用于检测给定的bolt何时收到任何给定请求ID的所有元组。 CoordinatedBolt利用直接流来管理这种协调。
拓扑的其余部分应该是不言自明的。如您所见,到达计算的每一步都是并行完成的,定义DRPC拓扑非常简单。
非线性DRPC拓扑
LinearDRPCTopologyBuilder仅处理“线性”DRPC拓扑,其中计算表示为一系列步骤(如覆盖范围)。 不难想象函数需要更复杂的拓扑结构,包括bolt的分支和合并。 现在,要做到这一点,你需要直接使用CoordinatedBolt。 请务必在邮件列表中讨论非线性DRPC拓扑的用例,以便为DRPC拓扑构建更一般的抽象。
LinearDRPCTopologyBuilder工作流程:
DRPCSpout发射[args, return-info]。return-info是DRPC服务器的主机名和端口号,以及DRPC服务器生成的id。
创建一个拓扑包括:
- DRPCSpout
- PrepareRequest(生成请求ID,为返回信息创建一个流,为参数创建一个流)
- CoordinatedBolt
- JoinResult(使用return info合并结果)
- ReturnResult(连接DRPC服务器以及返回结果)
LinearDRPCTopologyBuilder是在storm原语之上构建高级别抽象的一个很好的例子。
进阶
KeyedFairBolt用于编织多个同时请求的处理
如何直接使用CoordinatedBolt
DRPC (Distributed RPC) remote procedure call
分布式远程过程调用
DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)
DRPC设计目的:
为了充分利用Storm的计算能力实现高密度的并行实时计算。
(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)
客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。
定义DRPC拓扑
方法1:
通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现
方法2:
直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
需要手动设定好开始的DRPCSpout以及结束的ReturnResults
运行模式:
1、本地模式
2、远程模式(集群模式)
修改配置文件conf/storm.yaml
drpc.servers:
- "node1“
启动DRPC Server
bin/storm drpc &
通过StormSubmitter.submitTopology提交拓扑
案例:
Twitter 中某个URL的受众人数统计(这篇twitter到底有多少人看到过)