这个是在window环境下面安装的kafka
下载pom依赖
编写代码
编写SplitSentenceBolt
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector=outputCollector;
}
@Override
public void execute(Tuple tuple) {
//String sentece = tuple.getStringByField("sentence");
String sentece=tuple.getString(4);
String[] words = sentece.split(" ");
for (String word:words){
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("words"));
}
}
编写WordCountBolt
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap<String,Long> counts =null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap<>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("words");
// String word =tuple.getString(0);
Long count=this.counts.get(word);
if(count==null){
count=0L;
}
count++;
//出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据
this.counts.put(word,count);
this.collector.emit(new Values(word,count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word","count"));
}
}
编写ReportBolt
public class ReportBolt extends BaseRichBolt {
private HashMap
@Override
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count=input.getLongByField("count");
this.counts.put(word, count);
System.out.println("--------FINAL COUNTS--------");
List<String> keys=new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key:keys){
System.out.println(key+":"+this.counts.get(key));
}
System.out.println("----------------------------");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
编写Topology
public class MainTopology {
public static void main(String[] args)throws Exception {
TopologyBuilder builder = new TopologyBuilder();
KafkaSpoutConfig.Builder
//设置kafka属于哪个组
kafkabuilder.setGroupId("testgroup");
//创建kafkaspoutConfig
KafkaSpoutConfig<String, String> build = kafkabuilder.build();
//通过kafkaspoutconfig获取kafkaspout
KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(build);
//设置四个线程接收数据
builder.setSpout("kafkaspout",kafkaSpout,4);
// builder.setBolt("printBolt", new PrintBolt(),2).localOrShuffleGrouping("kafkaspout");
builder.setBolt("split-bolt",new SplitSentenceBolt(),2).setNumTasks(4).shuffleGrouping("kafkaspout");
// 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping
// 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中
builder.setBolt("count-bolt",new WordCountBolt(),2).fieldsGrouping("split-bolt",new Fields("words"));
builder.setBolt("report-bolt",new ReportBolt()).globalGrouping("count-bolt");
Config config=new Config();
config.setDebug(false);
config.setNumWorkers(2);
LocalCluster cluster =new LocalCluster();
cluster.submitTopology("kafkaspout",config,builder.createTopology());
}