storm和kafka的wordCount

这个是在window环境下面安装的kafka

下载pom依赖


编写代码

编写SplitSentenceBolt
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;

@Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector=outputCollector; } @Override public void execute(Tuple tuple) { //String sentece = tuple.getStringByField("sentence"); String sentece=tuple.getString(4); String[] words = sentece.split(" "); for (String word:words){ collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("words")); }

}

编写WordCountBolt
public class WordCountBolt extends BaseRichBolt {

private OutputCollector collector; private HashMap<String,Long> counts =null; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.collector = outputCollector; this.counts = new HashMap<>(); } @Override public void execute(Tuple tuple) { String word = tuple.getStringByField("words"); // String word =tuple.getString(0); Long count=this.counts.get(word); if(count==null){ count=0L; } count++; //出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据 this.counts.put(word,count); this.collector.emit(new Values(word,count)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word","count")); } }

编写ReportBolt
public class ReportBolt extends BaseRichBolt {
private HashMap

@Override public void execute(Tuple input) { String word=input.getStringByField("word"); Long count=input.getLongByField("count"); this.counts.put(word, count); System.out.println("--------FINAL COUNTS--------"); List<String> keys=new ArrayList<String>(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for(String key:keys){ System.out.println(key+":"+this.counts.get(key)); } System.out.println("----------------------------"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }

编写Topology
public class MainTopology {
public static void main(String[] args)throws Exception {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder

//设置kafka属于哪个组 kafkabuilder.setGroupId("testgroup"); //创建kafkaspoutConfig KafkaSpoutConfig<String, String> build = kafkabuilder.build(); //通过kafkaspoutconfig获取kafkaspout KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build); //设置四个线程接收数据 builder.setSpout("kafkaspout",kafkaSpout,4);

// builder.setBolt("printBolt", new PrintBolt(),2).localOrShuffleGrouping("kafkaspout");

builder.setBolt("split-bolt",new SplitSentenceBolt(),2).setNumTasks(4).shuffleGrouping("kafkaspout"); // 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping // 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中 builder.setBolt("count-bolt",new WordCountBolt(),2).fieldsGrouping("split-bolt",new Fields("words")); builder.setBolt("report-bolt",new ReportBolt()).globalGrouping("count-bolt"); Config config=new Config(); config.setDebug(false); config.setNumWorkers(2); LocalCluster cluster =new LocalCluster(); cluster.submitTopology("kafkaspout",config,builder.createTopology()); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzxzjs.html