SpringBoot整合Kafka和Storm (3)

这里为了方便,两种方法都编写了,通过主方法的args参数来进行控制。
Topology相关的配置说明在代码中的注释写的很详细了,这里我就不再多说了。
代码如下:

public void runStorm(String[] args) { // 定义一个拓扑 TopologyBuilder builder = new TopologyBuilder(); // 设置1个Executeor(线程),默认一个 builder.setSpout(Constants.KAFKA_SPOUT, new KafkaInsertDataSpout(), 1); // shuffleGrouping:表示是随机分组 // 设置1个Executeor(线程),和两个task builder.setBolt(Constants.INSERT_BOLT, new InsertBolt(), 1).setNumTasks(1).shuffleGrouping(Constants.KAFKA_SPOUT); Config conf = new Config(); //设置一个应答者 conf.setNumAckers(1); //设置一个work conf.setNumWorkers(1); try { // 有参数时,表示向集群提交作业,并把第一个参数当做topology名称 // 没有参数时,本地提交 if (args != null && args.length > 0) { logger.info("运行远程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { // 启动本地模式 logger.info("运行本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TopologyApp", conf, builder.createTopology()); } } catch (Exception e) { logger.error("storm启动失败!程序退出!",e); System.exit(1); } logger.info("storm启动成功..."); }

好了,编写完了kafka和storm相关的代码之后,我们再来进行和SpringBoot的整合!

在进行和SpringBoot整合前,我们先要解决下一下几个问题。

1 在SpringBoot程序中如何提交storm的Topolgy?

storm是通过提交Topolgy来确定如何启动的,一般使用过运行main方法来启动,但是SpringBoot启动方式一般也是通过main方法启动的。所以应该怎么样解决呢?

解决思路:将storm的Topology写在SpringBoot启动的主类中,随着SpringBoot启动而启动。

实验结果:可以一起启动(按理来说也是可以的)。但是随之而来的是下一个问题,bolt和spout类无法使用spring注解。

2 如何让bolt和spout类使用spring注解?

解决思路:在了解到spout和bolt类是由nimbus端实例化,然后通过序列化传输到supervisor,再反向序列化,因此无法使用注解,所以这里可以换个思路,既然不能使用注解,那么就动态获取Spring的bean就好了。

实验结果:使用动态获取bean的方法之后,可以成功启动storm了。

3.有时启动正常,有时无法启动,动态的bean也无法获取?

解决思路:在解决了1、2的问题之后,有时出现问题3,找了很久才找到,是因为之前加入了SpringBoot的热部署,去掉之后就没出现了...。

上面的三个问题是我在整合的时候遇到的,其中解决办法在目前看来是可行的,或许其中的问题可能是因为其他的原因导致的,不过目前就这样整合之后,就没出现过其他的问题了。若上述问题和解决办法有不妥之后,欢迎批评指正!

解决了上面的问题之后,我们回到代码这块。
其中,程序的入口,也就是主类的代码在进行整合后如下:

@SpringBootApplication public class Application{ public static void main(String[] args) { // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件 ConfigurableApplicationContext context = SpringApplication.run(Application.class, args); GetSpringBean springBean=new GetSpringBean(); springBean.setApplicationContext(context); TopologyApp app = context.getBean(TopologyApp.class); app.runStorm(args); } }

动态获取bean的代码如下:

public class GetSpringBean implements ApplicationContextAware{ private static ApplicationContext context; public static Object getBean(String name) { return context.getBean(name); } public static <T> T getBean(Class<T> c) { return context.getBean(c); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if(applicationContext!=null){ context = applicationContext; } } }

主要的代码的介绍就到这里了,至于其它的,基本就和以前的一样了。

测试结果

成功启动程序之后,我们先调用接口新增几条数据到kafka

新增请求:

POST :8087/api/user {"name":"张三","age":20} {"name":"李四","age":10} {"name":"王五","age":5}

新增成功之后,我们可以使用xshell工具在kafka集群中查看数据。
输入:**kafka-console-consumer.sh --zookeeper master:2181 --topic USER_TOPIC --from-beginning**

然后可以看到以下输出结果。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpggzj.html