分布式RPC
- DRPCClient client = new DRPCClient("drpc-host", 3772);
- String result = client.execute("reach", "http://twitter.com");
客户端发送功能名称及功能所需参数到DRPC服务器去执行。图中的拓扑实现了此功能,它使用DRPCSpout从DRPC服务器接收功能调用流。每个功能调用通过DRPC服务器使用唯一ID标记,随后拓扑计算结果,在拓扑的最后,一个称之为“ReturnResults”的bolt连接到DRPC服务器,把结果交给这个功能调用(根据功能调用ID),DRPC服务器根据ID找到等待中的客户端,为等待中的客户端消除阻塞,并发送结果给客户端。
- public static class ExclaimBolt implements IBasicBolt {
- public void prepare(Map conf, TopologyContext context) {
- }
- public void execute(Tuple tuple, BasicOutputCollector collector) {
- String input = tuple.getString(1);
- collector.emit(new Values(tuple.getValue(0), input + "!"));
- }
- public void cleanup() {
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "result"));
- }
- }
- public static void main(String[] args) throws Exception {
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
- builder.addBolt(new ExclaimBolt(), 3);
- // ...
- }
如你所见,代码非常少。当创建时,你把这个拓扑的DRPC功能名称告诉storm。一个DRPC服务器可以协调许多功能,功能名称用于区别不同的功能,首先声明的bolt将接收一个输入的2-tuples,第一个字段是请求ID,第二个字段是请求参数。认为最后的bolt会发射一个输出流,该输出流包含[id, result]格式的2-tuples。最后,所有拓扑中间过程产生的元组(tuple)都包含请求id作为其第一个字段。
- LocalDRPC drpc = new LocalDRPC();
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
- System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
- cluster.shutdown();
- drpc.shutdown();
- bin/storm drpc
- drpc.servers:
- - "drpc1.foo.com"
- - "drpc2.foo.com"
- StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
- LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
- builder.addBolt(new GetTweeters(), 3);
- builder.addBolt(new GetFollowers(), 12)
- .shuffleGrouping();
- builder.addBolt(new PartialUniquer(), 6)
- .fieldsGrouping(new Fields("id", "follower"));
- builder.addBolt(new CountAggregator(), 2)
- .fieldsGrouping(new Fields("id"));
1. GetTweeters获取tweeted the URL的用户。它转换一个[id, url]形式的输入流到[id, tweeter]形式的输出流。每个url元组将映射到多个tweeter元组。
2. GetFollowers获取这些tweeter的追随者。它转换一个[id, tweeter]形式的输入流到[id, follower]形式的输出流。跨所有任务,当某人追随多个tweeter,这些tweeter又tweeted相同的URL时,这可能会得到重复的追随者。
- public static class PartialUniquer implements IRichBolt, FinishedCallback {
- OutputCollector _collector;
- Map
- public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
- _collector = collector;
- }
- public void execute(Tuple tuple) {
- Object id = tuple.getValue(0);
- Set
curr = _sets.get(id); - if(curr==null) {
- curr = new HashSet
(); - _sets.put(id, curr);
- }
- curr.add(tuple.getString(1));
- _collector.ack(tuple);
- }
- public void cleanup() {
- }
- public void finishedId(Object id) {
- Set
curr = _sets.remove(id); - int count;
- if(curr!=null) {
- count = curr.size();
- } else {
- count = 0;
- }
- _collector.emit(new Values(id, count));
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "partial-count"));
- }
- }
在底层,用于检测一个bolt何时收到该请求ID的所有元组。CoordinatedBolt使用direct stream管理协调。
DRPCSpout发射[args, ],return-info是DRPC服务器的主机和端口,还有DRPC服务器生成的ID。
- DRPCSpout
- PrepareRequest(生成一个请求ID,创建一个返回信息流,一个参数流)
- CoordinatedBolt包装器和直接分组
- JoinResult(同返回信息一起连接结果)
- ReturnResult(连接DRPC服务器并返回结果)
- LinearDRPCTopologyBuilder是一个构建在Storm原语之上的高层次抽象的好例子。
- 同时编排处理多个请的KeyedFairBolt
- 如何直接使用