点击(此处)折叠或打开
-
import backtype.storm.spout.SpoutOutputCollector;
-
import backtype.storm.task.TopologyContext;
-
import backtype.storm.topology.OutputFieldsDeclarer;
-
import backtype.storm.topology.base.BaseRichSpout;
-
import backtype.storm.tuple.Fields;
-
import backtype.storm.tuple.Values;
-
-
import java.io.BufferedReader;
-
import java.io.FileNotFoundException;
-
import java.io.FileReader;
-
import java.util.Map;
-
-
public class WordReader extends BaseRichSpout {
-
-
private SpoutOutputCollector collector;
-
private FileReader fileReader;
-
private boolean completed = false;
-
public void ack(Object msgId) {
-
System.out.println("OK:"+msgId);
-
}
-
public void close() {}
-
public void fail(Object msgId) {
-
System.out.println("FAIL:"+msgId);
-
}
-
-
/**
-
* The only thing that the methods will do It is emit each
-
* file line
-
*/
-
public void nextTuple() {
-
/**
-
* The nextuple it is called forever, so if we have been readed the file
-
* we will wait and then return
-
*/
-
if(completed){
-
try {
-
Thread.sleep(1000);
-
} catch (InterruptedException e) {
-
//Do nothing
-
}
-
return;
-
}
-
String str;
-
//Open the reader
-
BufferedReader reader = new BufferedReader(fileReader);
-
try{
-
//Read all lines
-
while((str = reader.readLine()) != null){
-
/**
-
* By each line emmit a new value with the line as a their
-
*/
-
//System.out.println(str);
-
this.collector.emit(new Values(str),str);
-
}
-
}catch(Exception e){
-
throw new RuntimeException("Error reading tuple",e);
-
}finally{
-
completed = true;
-
}
-
}
-
-
/**
-
* We will create the file and get the collector object
-
*/
-
public void open(Map conf, TopologyContext context,
-
SpoutOutputCollector collector) {
-
try {
-
this.fileReader = new FileReader(conf.get("wordsFile").toString());
-
} catch (FileNotFoundException e) {
-
throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
-
}
-
this.collector = collector;
-
}
-
-
/**
-
* Declare the output field "word"
-
*/
-
public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
declarer.declare(new Fields("line"));
-
}
- }
点击(此处)折叠或打开
-
import backtype.storm.topology.BasicOutputCollector;
-
import backtype.storm.topology.OutputFieldsDeclarer;
-
import backtype.storm.topology.base.BaseBasicBolt;
-
import backtype.storm.tuple.Fields;
-
import backtype.storm.tuple.Tuple;
-
import backtype.storm.tuple.Values;
-
-
public class WordNormalizer extends BaseBasicBolt {
-
-
public void cleanup() {}
-
-
/**
-
* The bolt will receive the line from the
-
* words file and process it to Normalize this line
-
*
-
* The normalize will be put the words in lower case
-
* and split the line to get all words in this
-
*/
-
public void execute(Tuple input, BasicOutputCollector collector) {
-
String sentence = input.getString(0);
-
String[] words = sentence.split(" ");
-
for(String word : words){
-
word = word.trim();
-
if(!word.isEmpty()){
-
word = word.toLowerCase();
-
System.out.println("-------word:"+word);
-
collector.emit(new Values(word));
-
}
-
}
-
}
-
-
-
/**
-
* The bolt will only emit the field "word"
-
*/
-
public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
declarer.declare(new Fields("word"));
-
}
- }
点击(此处)折叠或打开
-
import java.util.HashMap;
-
import java.util.Map;
-
-
import backtype.storm.task.TopologyContext;
-
import backtype.storm.topology.BasicOutputCollector;
-
import backtype.storm.topology.OutputFieldsDeclarer;
-
import backtype.storm.topology.base.BaseBasicBolt;
-
import backtype.storm.tuple.Tuple;
-
-
public class WordCounter extends BaseBasicBolt {
-
-
Integer id;
-
String name;
-
Map<String, Integer> counters;
-
-
/**
-
* At the end of the spout (when the cluster is shutdown
-
* We will show the word counters
-
*/
-
@Override
-
public void cleanup() {
-
System.out.println("-- Word Counter ["+name+"-"+id+"] --");
-
for(Map.Entry<String, Integer> entry : counters.entrySet()){
-
System.out.println(entry.getKey()+": "+entry.getValue());
-
}
-
}
-
-
/**
-
* On create
-
*/
-
@Override
-
public void prepare(Map stormConf, TopologyContext context) {
-
this.counters = new HashMap<String, Integer>();
-
this.name = context.getThisComponentId();
-
this.id = context.getThisTaskId();
-
}
-
-
@Override
-
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
-
-
@Override
-
public void execute(Tuple input, BasicOutputCollector collector) {
-
String str = input.getString(0);
-
/**
-
* If the word dosn't exist in the map we will create
-
* this, if not We will add 1
-
*/
-
if(!counters.containsKey(str)){
-
counters.put(str, 1);
-
}else{
-
Integer c = counters.get(str) + 1;
-
counters.put(str, c);
-
}
-
}
- }
点击(此处)折叠或打开
-
import spouts.WordReader;
-
import backtype.storm.Config;
-
import backtype.storm.LocalCluster;
-
import backtype.storm.topology.TopologyBuilder;
-
import backtype.storm.tuple.Fields;
-
import bolts.WordCounter;
-
import bolts.WordNormalizer;
-
-
-
public class TopologyMain {
-
public static void main(String[] args) throws InterruptedException {
-
-
//Topology definition
-
TopologyBuilder builder = new TopologyBuilder();
-
builder.setSpout("word-reader",new WordReader());
-
builder.setBolt("word-normalizer", new WordNormalizer())
-
.shuffleGrouping("word-reader");
-
builder.setBolt("word-counter", new WordCounter(),2)
-
.fieldsGrouping("word-normalizer", new Fields("word"));
-
-
//Configuration
-
Config conf = new Config();
-
//String fileName=args[0];
-
String fileName="D:\\words.txt";
-
conf.put("wordsFile", fileName);
-
conf.setDebug(false);
-
//Topology run
-
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
-
LocalCluster cluster = new LocalCluster();
-
cluster.submitTopology("Getting-Started-Toplogy", conf, builder.createTopology());
-
Thread.sleep(50000);
-
-
cluster.shutdown();
-
}
- }