参考林子雨教案安装,成功后显示: 先来回忆一下storm的基本组成: 首先更新配置文件pom.xml WordCount.java 文件 最后运行的结果为: 运行注意事项: 2 be con…Storm的下载与安装
基于Storm的wordcount应用
实现原理
wordcount的topo结构:
代码
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zchi</groupId> <artifactId>storm_study</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.7</version> </dependency> </dependencies> </project>
import java.util.*; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.*; /* ** WordCountTopolopgyAllInJava类(单词计数) ** @author zhangchi */ public class wordcount { // 定义一个喷头,用于产生数据。该类继承自BaseRichSpout public static class RandomSentenceSpout extends BaseRichSpout { private SpoutOutputCollector _collector; private Random _rand; private Map<String,Values> pending; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); pending=new HashMap<String, Values>(); } @Override public void nextTuple() { // 睡眠一段时间后再产生一个数据 Utils.sleep(100); // 句子数组 String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; // 随机选择一个句子 String sentence = sentences[_rand.nextInt(sentences.length)]; Values tmpValues=new Values(sentence); String msgID= UUID.randomUUID().toString(); pending.put(msgID,tmpValues); // 发射该句子给Bolt,每个tuple都有一个唯一标识 _collector.emit(tmpValues,msgID); } // 确认函数:成功处理的tuple,其id会从pending列表中删除 @Override public void ack(Object id) { System.out.println("Msg:"+id+" send successful!"); pending.remove(id); } // 失败处理函数:处理失败的时候重新发送一次tuple, @Override public void fail(Object id) { System.out.println("Msg:"+id+" send failed,will try again!"); Values failedMsg=pending.get(id); _collector.emit(failedMsg,id); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义一个字段word declarer.declare(new Fields("word")); } } // 定义个Bolt,用于将句子切分为单词 public static class SplitSentence extends BaseRichBolt { private OutputCollector collector; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义一个字段 declarer.declare(new Fields("word")); } @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { collector=outputCollector; } @Override public void execute(Tuple tuple) { // 接收到一个句子 String sentence = tuple.getString(0); // 把句子切割为单词 StringTokenizer iter = new StringTokenizer(sentence); // 发送每一个单词 while (iter.hasMoreElements()) { collector.emit(new Values(iter.nextToken())); } // 确认对数据进行处理 collector.ack(tuple); } } // 定义一个Bolt,用于单词计数 public static class WordCount extends BaseBasicBolt { Map<String, Long> counts = new HashMap<String, Long>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { // 接收一个单词 String word = tuple.getString(0); // 获取该单词对应的计数 Long count = counts.get(word); if (count == null) count = 0l; // 计数增加 count++; // 将单词和对应的计数加入map中 counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义两个字段word和count declarer.declare(new Fields("word", "count")); } } //定义全局Bolt,用于统计最终结果以及所有的单词数统计 public static class GlobalWordCount extends BaseBasicBolt{ Map<String,Long> result=new HashMap<String, Long>(); @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { String word=tuple.getStringByField("word"); Long count=tuple.getLongByField("count"); result.put(word,count); } @Override public void cleanup(){ System.out.println("---------------------------------Final Result----------------------------------------------"); long totalCount=0; for (String key:result.keySet()){ long count=result.get(key); System.out.println("---------------------------------Word:"+key+" Count:"+count); totalCount+=count; } System.out.println("---------------------------------TotalCount:"+totalCount); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } public static void main(String[] args) throws Exception { // 创建一个拓扑 TopologyBuilder builder = new TopologyBuilder(); // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5 builder.setSpout("spout", new RandomSentenceSpout(), 2); // 设置slot——“split”,并行度为8,它的数据来源是spout的 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); // 设置slot——“count”,你并行度为12,它的数据来源是split的word字段 builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); //设置slot--------"globalcount" ,数据来源是spout builder.setBolt("globalcount",new GlobalWordCount()).globalGrouping("count"); Config conf = new Config(); conf.setDebug(false); //if(args != null && args.length > 0){ //if(false){ // conf.setNumWorkers(3); // StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); //}else{ conf.setMaxTaskParallelism(3); // 本地伪集群模式运行 LocalCluster cluster = new LocalCluster(); // 集群运行 // StormSubmitter.submitTopology("word-count", conf, builder.createTopology() ); //本地伪集群 提交拓扑(该拓扑的名字叫word-count) cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(30000); cluster.killTopology("word-count"); cluster.shutdown(); //} } }
---------------------------------Final Result---------------------------------------------- ---------------------------------Word:away Count:111 ---------------------------------Word:ago Count:102 ---------------------------------Word:jumped Count:114 ---------------------------------Word:seven Count:199 ---------------------------------Word:cow Count:114 ---------------------------------Word:two Count:106 ---------------------------------Word:years Count:102 ---------------------------------Word:dwarfs Count:97 ---------------------------------Word:score Count:102 ---------------------------------Word:apple Count:111 ---------------------------------Word:white Count:97 ---------------------------------Word:four Count:102 ---------------------------------Word:and Count:199 ---------------------------------Word:keeps Count:111 ---------------------------------Word:day Count:111 ---------------------------------Word:over Count:114 ---------------------------------Word:a Count:111 ---------------------------------Word:nature Count:106 ---------------------------------Word:i Count:106 ---------------------------------Word:am Count:106 ---------------------------------Word:an Count:111 ---------------------------------Word:the Count:436 ---------------------------------Word:doctor Count:111 ---------------------------------Word:with Count:106 ---------------------------------Word:moon Count:114 ---------------------------------Word:at Count:106 ---------------------------------Word:snow Count:97 ---------------------------------TotalCount:3402
cd /usr/local/storm ./bin/storm nimbus
/usr/local/storm/bin/storm supervisor
将storm写入HDFS
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算