springCloud学习5(Spring-Cloud-Stream事件驱动) (3)

  最后修改配置文件,为 input 通道指定 topic,配置如下:

spring: cloud: stream: bindings: input: destination: orgChangeTopic content-type: application/json # 定义将要消费消息的消费者组的名称 # 可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的 # 一个副本会被该组的某个实例所消费 group: licensingGroup kafka: binder: zk-nodes: 192.168.226.5:2181 brokers: 192.168.226.5:9092

基本和发送的配置相同,只是这里是为input通道映射队列,然后还定义了一个组名,避免一个消息被重复消费。

  现在来多次访问localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制台打印数据从缓存中读取,如下所示:

缓存

然后再以 delete 访问localhost:5555/apis/org/organization/12清除缓存,再次访问 licensingservice 服务,结果如下:

自定义通道

  上面用的是Spring Cloud Stream自带的 input/output 通道,那么要如何自定义通道呢?下面以自定义customInput/customOutput通道为例。

自定义发数据通道 public interface CustomOutput { @Output("customOutput") MessageChannel out(); }

  对于每个自定义的发数据通道,需使用@OutPut 注解标记的返回 MessageChannel 类的方法。

自定义收数据通道 public interface CustomInput { @Input("customInput") SubscribableChannel in(); }

  同上,对应自定义的收数据通道,需要使用@Input 注解标记的返回 SubscribableChannel 类的方法。

结束

  看完本篇你应该已经能够在 Spring Cloud 中集成 Spring Cloud Stream 消息队列了,貌似这个也能用到普通的 spring boot 项目中,比直接集成 mq 更加的优雅。

2019,Fighting!

本篇原创发布于:FleyX 的个人博客

本篇所用全部代码:FleyX 的 github

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwsdw.html