Spring Cloud 系列之 Stream 消息驱动(一) (4)

  配置 RabbitMQ 消息队列和 Stream 消息发送与接收的通道。

server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 # 配置 Eureka Server 注册中心 eureka: instance: prefer-ip-address: true # 是否使用 ip 地址注册 instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port client: service-url: # 设置服务注册中心地址 defaultZone: :8761/eureka/,:8762/eureka/

  

接收消息

  

  MessageConsumer.java

package com.example.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(Sink.class) public class MessageConsumer { /** * 接收消息 * * @param message */ @StreamListener(Sink.INPUT) public void receive(String message) { System.out.println("message = " + message); } }

  

启动类

  

  StreamConsumerApplication.java

package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamConsumerApplication { public static void main(String[] args) { SpringApplication.run(StreamConsumerApplication.class); } }

  

测试

  

单元测试

  

  MessageProducerTest.java

package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { messageProducer.send("hello spring cloud stream"); } }

  

访问

  

  启动消息消费者,运行单元测试,消息消费者控制台打印结果如下:

message = hello spring cloud stream

  RabbitMQ 界面如下:

Spring Cloud 系列之 Stream 消息驱动(一)

  

自定义消息通道

  

创建消息通道

  

  参考源码 Source.java 和 Sink.java 创建自定义消息通道。

  自定义消息发送通道 MySource.java

package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource { String MY_OUTPUT = "my_output"; @Output(MY_OUTPUT) MessageChannel myOutput(); }

  自定义消息接收通道 MySink.java

package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink { String MY_INPUT = "my_input"; @Input(MY_INPUT) SubscribableChannel myInput(); }

  

配置文件

  

  消息生产者。

server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息发送通道 # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同 output: destination: stream.message # 绑定的交换机名称 my_output: destination: my.message # 绑定的交换机名称

  

  消息消费者。

server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 my_input: destination: my.message # 绑定的交换机名称

  

代码重构

  

  消息生产者 MyMessageProducer.java。

package com.example.producer; import com.example.channel.MySource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource.class) public class MyMessageProducer { @Autowired private MySource mySource; /** * 发送消息 * * @param message */ public void send(String message) { mySource.myOutput().send(MessageBuilder.withPayload(message).build()); } }

  

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzgps.html