消息消费者 MyMessageConsumer.java。
package com.example.consumer; import com.example.channel.MySink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink.class) public class MyMessageConsumer { /** * 接收消息 * * @param message */ @StreamListener(MySink.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }测试
单元测试
MessageProducerTest.java
package com.example; import com.example.producer.MyMessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MyMessageProducer myMessageProducer; @Test public void testMySend() { myMessageProducer.send("hello spring cloud stream"); } }访问
启动消息消费者,运行单元测试,消息消费者控制台打印结果如下:
message = hello spring cloud streamRabbitMQ 界面如下:
配置优化
Spring Cloud 微服务开发之所以简单,除了官方做了许多彻底的封装之外还有一个优点就是约定大于配置。开发人员仅需规定应用中不符约定的部分,在没有规定配置的地方采用默认配置,以力求最简配置为核心思想。
简单理解就是:Spring 遵循了推荐默认配置的思想,当存在特殊需求时候,自定义配置即可否则无需配置。
在 Spring Cloud Stream 中,@Output("output") 和 @Input("input") 注解的 value 默认即为绑定的交换机名称。所以自定义消息通道的案例我们就可以重构为以下方式。
创建消息通道
参考源码 Source.java 和 Sink.java 创建自定义消息通道。
自定义消息发送通道 MySource02.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource02 { String MY_OUTPUT = "default.message"; @Output(MY_OUTPUT) MessageChannel myOutput(); }自定义消息接收通道 MySink02.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink02 { String MY_INPUT = "default.message"; @Input(MY_INPUT) SubscribableChannel myInput(); }配置文件
消息生产者。
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
消息消费者。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址代码重构
消息生产者 MyMessageProducer02.java。
package com.example.producer; import com.example.channel.MySource02; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource02.class) public class MyMessageProducer02 { @Autowired private MySource02 mySource02; /** * 发送消息 * * @param message */ public void send(String message) { mySource02.myOutput().send(MessageBuilder.withPayload(message).build()); } }
消息消费者 MyMessageConsumer02.java。
package com.example.consumer; import com.example.channel.MySink02; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink02.class) public class MyMessageConsumer02 { /** * 接收消息 * * @param message */ @StreamListener(MySink02.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }测试
单元测试