配置 RabbitMQ 消息队列和 Stream 消息发送与接收的通道。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 # 配置 Eureka Server 注册中心 eureka: instance: prefer-ip-address: true # 是否使用 ip 地址注册 instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port client: service-url: # 设置服务注册中心地址 defaultZone: :8761/eureka/,:8762/eureka/接收消息
MessageConsumer.java
package com.example.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(Sink.class) public class MessageConsumer { /** * 接收消息 * * @param message */ @StreamListener(Sink.INPUT) public void receive(String message) { System.out.println("message = " + message); } }启动类
StreamConsumerApplication.java
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamConsumerApplication { public static void main(String[] args) { SpringApplication.run(StreamConsumerApplication.class); } }测试
单元测试
MessageProducerTest.java
package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { messageProducer.send("hello spring cloud stream"); } }访问
启动消息消费者,运行单元测试,消息消费者控制台打印结果如下:
message = hello spring cloud streamRabbitMQ 界面如下:
自定义消息通道
创建消息通道
参考源码 Source.java 和 Sink.java 创建自定义消息通道。
自定义消息发送通道 MySource.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource { String MY_OUTPUT = "my_output"; @Output(MY_OUTPUT) MessageChannel myOutput(); }自定义消息接收通道 MySink.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink { String MY_INPUT = "my_input"; @Input(MY_INPUT) SubscribableChannel myInput(); }配置文件
消息生产者。
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息发送通道 # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同 output: destination: stream.message # 绑定的交换机名称 my_output: destination: my.message # 绑定的交换机名称
消息消费者。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 my_input: destination: my.message # 绑定的交换机名称代码重构
消息生产者 MyMessageProducer.java。
package com.example.producer; import com.example.channel.MySource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource.class) public class MyMessageProducer { @Autowired private MySource mySource; /** * 发送消息 * * @param message */ public void send(String message) { mySource.myOutput().send(MessageBuilder.withPayload(message).build()); } }