“源代码走读系列”从代码层面分析了storm的具体实现,然后通过具体实例说明storm的使用情况。由于storm已经正式迁移到Apache,文章系列也是twitter storm转变为apachee storm.
WordCountTopology 使用storm统计文件中每个单词的出现次数。
通过这个例子来解释tuple发送时的几个要素
- source component 发送源
- destination component 接收者
- stream 消息通道
- tuple 消息本身
本文所涉及的开发环境建设可参考前两篇博文。
- arch linux简易安装指南
- 在archlinux上建立storm cluster
事实上,统计文件中的单词是Linux的下一个非常常见的任务。它可以很容易地用awk解决(如果文件不太大)。以下是word counting的awk脚本将其保存为wordcountt.awk文件。
wordcount.awk
{ for (i = 1; i<=NF; i++) freq[$i]++ }END{ for (word in freq) printf "%s\t%d\n",word,freq[word]}
操作脚本,统计文件中的单词
gawk -f wordcount.awk filename
原始版本
在github上复制内容
git clone https://github.com/nathanmarz/storm-starter.git
编译运行
lein depslein compilejava -cp $(lein classpath) WordCountTopology
main函数
main函数的主要内容
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
注:grouping操作时,如果指定stream没有显示 id,使用default stream. 例如shuflegrouping(”spout)表示从名为spout的component中接收default stream发送的tupleam.
改进版本
在原版中,spout不断地向split发送 bolt随机发送句子,Count bolt统计每个单词出现的次数。
读完文件后,Spout能通知下游bolt显示最新的统计结果吗?
为实现上述改进目标,可采用上图所示的结构。变化如下,
- 在Spout中添加SUCCES_STREAM
- statisticsscs只添加一个操作实例 bolt
- 当spout读取文件内容时,通过SUCCES_STREAM告诉statisticstics bolt,文件已经处理好,可以打印当前的统计结果
添加SUCCESS_STREAM
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); declarer.declareStream("SUCCESS_STREAM",new Fields("word")); }
nextTuple
使用SUCCES_STREAM通知下游,文件处理完毕
@Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; if ( count == sentences.length ) { System.out.println(count+" try to emit tuple by success_stream"); _collector.emit("SUCCESS_STREAM",new Values(sentences[0])); count++; }else if ( count < sentences.length ){ _collector.emit(new Values(sentences[count])); count++; } }
WordCountTopology.java 添加静态WordCount2
public static class WordCount2 extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) { System.out.println("prepare to print the statistics"); for (String key : counts.keySet()) { System.out.println(key+"\t"+counts.get(key)); } System.out.println("finish printing"); }else { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); } }
main函数
将spout的并行数从5改为1
builder.setSpout("spout", new RandomSentenceSpout(), 1);
将WordCount2添加到原始Topology中 Bolt
builder.setBolt(count2”, new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");
WordCount2 从Countt接收Bolt 通过default通过Bolt通过defalt stream发送的tuple,Spout通过SUCESS_STREAM发送的tuple,也就是说wordcount2将从两个stream接收数据。
编译编译修改后的源文件
cd $STROM_STARTERlein compile storm.starter
可能会出现以下异常信息,可以忽略。
Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:
运行
WordCountopology在local模式下运行修改后
java -cp $(lein classpath) storm.starter.WordCountTopology
假如一切正常,日志如下所示,线程的名称可能会有所不同。
moon 1score 1cow 1doctor 1over 1nature 1snow 1four 1keeps 1with 1a 1white 1dwarfs 1at 1the 4and 2i 1two 1away 1seven 2apple 1am 1an 1jumped 1day 1years 1ago 1
结果验证
与awk脚本的运行结果相比,WordCountopology的运行结果应该是一致的。
小技巧- awk脚本的执行结果存储为文件result1.log, WordCountopology输出中的单词统计部分存储在result2.log
- 用vim打开result1.log,sorting,保存结果;用vim打开result2.log,保存sorting。
- 然后用vimdiff进行比较 vimdiff result1.log result2.log
转载 :
“源代码走读系列”从代码层面分析了storm的具体实现,然后通过具体实例说明storm的使用情况。由于storm已经正式迁移到Apache,文章系列也是twitter storm转变为apachee storm.
WordCountTopology 使用storm统计文件中每个单词的出现次数。
通过这个例子来解释tuple发送时的几个要素
- source component 发送源
- destination component 接收者
- stream 消息通道
- tuple 消息本身
本文所涉及的开发环境建设可参考前两篇博文。
- arch linux简易安装指南
- 在archlinux上建立storm cluster
事实上,统计文件中的单词是Linux的下一个非常常见的任务。它可以很容易地用awk解决(如果文件不太大)。以下是word counting的awk脚本将其保存为wordcountt.awk文件。
wordcount.awk
{ for (i = 1; i<=NF; i++) freq[$i]++ }END{ for (word in freq) printf "%s\t%d\n",word,freq[word]}
操作脚本,统计文件中的单词
gawk -f wordcount.awk filename
原始版本
在github上复制内容
git clone https://github.com/nathanmarz/storm-starter.git
编译运行
lein depslein compilejava -cp $(lein classpath) WordCountTopology
main函数
main函数的主要内容
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
注:grouping操作时,如果指定stream没有显示 id,使用default stream. 例如shuflegrouping(”spout)表示从名为spout的component中接收default stream发送的tupleam.
改进版本
在原版中,spout不断地向split发送 bolt随机发送句子,Count bolt统计每个单词出现的次数。
读完文件后,Spout能通知下游bolt显示最新的统计结果吗?
为实现上述改进目标,可采用上图所示的结构。变化如下,
- 在Spout中添加SUCCES_STREAM
- statisticsscs只添加一个操作实例 bolt
- 当spout读取文件内容时,通过SUCCES_STREAM告诉statisticstics bolt,文件已经处理好,可以打印当前的统计结果
添加SUCCESS_STREAM
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); declarer.declareStream("SUCCESS_STREAM",new Fields("word")); }
nextTuple
使用SUCCES_STREAM通知下游,文件处理完毕
@Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; if ( count == sentences.length ) { System.out.println(count+" try to emit tuple by success_stream"); _collector.emit("SUCCESS_STREAM",new Values(sentences[0])); count++; }else if ( count < sentences.length ){ _collector.emit(new Values(sentences[count])); count++; } }
WordCountTopology.java 添加静态WordCount2
public static class WordCount2 extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) { System.out.println("prepare to print the statistics"); for (String key : counts.keySet()) { System.out.println(key+"\t"+counts.get(key)); } System.out.println("finish printing"); }else { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); } }
main函数
将spout的并行数从5改为1
builder.setSpout("spout", new RandomSentenceSpout(), 1);
将WordCount2添加到原始Topology中 Bolt
builder.setBolt(count2”, new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");
WordCount2 从Countt接收Bolt 通过default通过Bolt通过defalt stream发送的tuple,Spout通过SUCESS_STREAM发送的tuple,也就是说wordcount2将从两个stream接收数据。
编译编译修改后的源文件
cd $STROM_STARTERlein compile storm.starter
可能会出现以下异常信息,可以忽略。
Exception in thread "main" java.io.FileNotFoundException:Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:
运行
WordCountopology在local模式下运行修改后
java -cp $(lein classpath) storm.starter.WordCountTopology
假如一切正常,日志如下所示,线程的名称可能会有所不同。
moon 1score 1cow 1doctor 1over 1nature 1snow 1four 1keeps 1with 1a 1white 1dwarfs 1at 1the 4and 2i 1two 1away 1seven 2apple 1am 1an 1jumped 1day 1years 1ago 1
结果验证
与awk脚本的运行结果相比,WordCountopology的运行结果应该是一致的。
小技巧- awk脚本的执行结果存储为文件result1.log, WordCountopology输出中的单词统计部分存储在result2.log
- 用vim打开result1.log,sorting,保存结果;用vim打开result2.log,保存sorting。
- 然后用vimdiff进行比较 vimdiff result1.log result2.log