这里为了方便,两种方法都编写了,通过主方法的args参数来进行控制。
Topology相关的配置说明在代码中的注释写的很详细了,这里我就不再多说了。
代码如下:
好了,编写完了kafka和storm相关的代码之后,我们再来进行和SpringBoot的整合!
在进行和SpringBoot整合前,我们先要解决下一下几个问题。
1 在SpringBoot程序中如何提交storm的Topolgy?
storm是通过提交Topolgy来确定如何启动的,一般使用过运行main方法来启动,但是SpringBoot启动方式一般也是通过main方法启动的。所以应该怎么样解决呢?
解决思路:将storm的Topology写在SpringBoot启动的主类中,随着SpringBoot启动而启动。
实验结果:可以一起启动(按理来说也是可以的)。但是随之而来的是下一个问题,bolt和spout类无法使用spring注解。
2 如何让bolt和spout类使用spring注解?
解决思路:在了解到spout和bolt类是由nimbus端实例化,然后通过序列化传输到supervisor,再反向序列化,因此无法使用注解,所以这里可以换个思路,既然不能使用注解,那么就动态获取Spring的bean就好了。
实验结果:使用动态获取bean的方法之后,可以成功启动storm了。
3.有时启动正常,有时无法启动,动态的bean也无法获取?
解决思路:在解决了1、2的问题之后,有时出现问题3,找了很久才找到,是因为之前加入了SpringBoot的热部署,去掉之后就没出现了...。
上面的三个问题是我在整合的时候遇到的,其中解决办法在目前看来是可行的,或许其中的问题可能是因为其他的原因导致的,不过目前就这样整合之后,就没出现过其他的问题了。若上述问题和解决办法有不妥之后,欢迎批评指正!
解决了上面的问题之后,我们回到代码这块。
其中,程序的入口,也就是主类的代码在进行整合后如下:
动态获取bean的代码如下:
public class GetSpringBean implements ApplicationContextAware{ private static ApplicationContext context; public static Object getBean(String name) { return context.getBean(name); } public static <T> T getBean(Class<T> c) { return context.getBean(c); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if(applicationContext!=null){ context = applicationContext; } } }主要的代码的介绍就到这里了,至于其它的,基本就和以前的一样了。
测试结果成功启动程序之后,我们先调用接口新增几条数据到kafka
新增请求:
POST :8087/api/user {"name":"张三","age":20} {"name":"李四","age":10} {"name":"王五","age":5}新增成功之后,我们可以使用xshell工具在kafka集群中查看数据。
输入:**kafka-console-consumer.sh --zookeeper master:2181 --topic USER_TOPIC --from-beginning**
然后可以看到以下输出结果。