最后修改配置文件,为 input 通道指定 topic,配置如下:
spring: cloud: stream: bindings: input: destination: orgChangeTopic content-type: application/json # 定义将要消费消息的消费者组的名称 # 可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的 # 一个副本会被该组的某个实例所消费 group: licensingGroup kafka: binder: zk-nodes: 192.168.226.5:2181 brokers: 192.168.226.5:9092基本和发送的配置相同,只是这里是为input通道映射队列,然后还定义了一个组名,避免一个消息被重复消费。
现在来多次访问localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制台打印数据从缓存中读取,如下所示:
然后再以 delete 访问localhost:5555/apis/org/organization/12清除缓存,再次访问 licensingservice 服务,结果如下:
自定义通道上面用的是Spring Cloud Stream自带的 input/output 通道,那么要如何自定义通道呢?下面以自定义customInput/customOutput通道为例。
自定义发数据通道 public interface CustomOutput { @Output("customOutput") MessageChannel out(); }对于每个自定义的发数据通道,需使用@OutPut 注解标记的返回 MessageChannel 类的方法。
自定义收数据通道 public interface CustomInput { @Input("customInput") SubscribableChannel in(); }同上,对应自定义的收数据通道,需要使用@Input 注解标记的返回 SubscribableChannel 类的方法。
结束看完本篇你应该已经能够在 Spring Cloud 中集成 Spring Cloud Stream 消息队列了,貌似这个也能用到普通的 spring boot 项目中,比直接集成 mq 更加的优雅。
2019,Fighting!
本篇原创发布于:FleyX 的个人博客
本篇所用全部代码:FleyX 的 github