Spring Cloud 系列之 Stream 消息驱动(一) (5)

  消息消费者 MyMessageConsumer.java。

package com.example.consumer; import com.example.channel.MySink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink.class) public class MyMessageConsumer { /** * 接收消息 * * @param message */ @StreamListener(MySink.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }

  

测试

  

单元测试

  

  MessageProducerTest.java

package com.example; import com.example.producer.MyMessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MyMessageProducer myMessageProducer; @Test public void testMySend() { myMessageProducer.send("hello spring cloud stream"); } }

  

访问

  

  启动消息消费者,运行单元测试,消息消费者控制台打印结果如下:

message = hello spring cloud stream

  RabbitMQ 界面如下:

Spring Cloud 系列之 Stream 消息驱动(一)

  

配置优化

  

  Spring Cloud 微服务开发之所以简单,除了官方做了许多彻底的封装之外还有一个优点就是约定大于配置。开发人员仅需规定应用中不符约定的部分,在没有规定配置的地方采用默认配置,以力求最简配置为核心思想。

简单理解就是:Spring 遵循了推荐默认配置的思想,当存在特殊需求时候,自定义配置即可否则无需配置。

  

  在 Spring Cloud Stream 中,@Output("output") 和 @Input("input") 注解的 value 默认即为绑定的交换机名称。所以自定义消息通道的案例我们就可以重构为以下方式。

  

创建消息通道

  

  参考源码 Source.java 和 Sink.java 创建自定义消息通道。

  自定义消息发送通道 MySource02.java

package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource02 { String MY_OUTPUT = "default.message"; @Output(MY_OUTPUT) MessageChannel myOutput(); }

  自定义消息接收通道 MySink02.java

package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink02 { String MY_INPUT = "default.message"; @Input(MY_INPUT) SubscribableChannel myInput(); }

  

配置文件

  

  消息生产者。

server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址

  

  消息消费者。

server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址

  

代码重构

  

  消息生产者 MyMessageProducer02.java。

package com.example.producer; import com.example.channel.MySource02; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource02.class) public class MyMessageProducer02 { @Autowired private MySource02 mySource02; /** * 发送消息 * * @param message */ public void send(String message) { mySource02.myOutput().send(MessageBuilder.withPayload(message).build()); } }

  

  消息消费者 MyMessageConsumer02.java。

package com.example.consumer; import com.example.channel.MySink02; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink02.class) public class MyMessageConsumer02 { /** * 接收消息 * * @param message */ @StreamListener(MySink02.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }

  

测试

  

单元测试

  

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzgps.html