MessageProducerTest.java
package com.example; import com.example.producer.MyMessageProducer02; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MyMessageProducer02 myMessageProducer02; @Test public void testMySend02() { myMessageProducer02.send("约定大于配置"); } }访问
启动消息消费者,运行单元测试,消息消费者控制台打印结果如下:
message = 约定大于配置RabbitMQ 界面如下:
短信邮件发送案例
一个消息驱动微服务应用可以既是消息生产者又是消息消费者。接下来模拟一个短信邮件发送的消息处理过程:
原始消息发送至 source.message 交换机;
消息驱动微服务应用通过 source.message 交换机接收原始消息,经过处理分别发送至 sms.message 和 email.message 交换机;
消息驱动微服务应用通过 sms.message 和 email.message 交换机接收处理后的消息并发送短信和邮件。
创建消息通道
发送原始消息,接收处理后的消息并发送短信和邮件的消息驱动微服务应用。
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息通道 */ public interface MyProcessor { String SOURCE_MESSAGE = "source.message"; String SMS_MESSAGE = "sms.message"; String EMAIL_MESSAGE = "email.message"; @Output(SOURCE_MESSAGE) MessageChannel sourceOutput(); @Input(SMS_MESSAGE) SubscribableChannel smsInput(); @Input(EMAIL_MESSAGE) SubscribableChannel emailInput(); }
接收原始消息,经过处理分别发送短信和邮箱的消息驱动微服务应用。
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息通道 */ public interface MyProcessor { String SOURCE_MESSAGE = "source.message"; String SMS_MESSAGE = "sms.message"; String EMAIL_MESSAGE = "email.message"; @Input(SOURCE_MESSAGE) MessageChannel sourceOutput(); @Output(SMS_MESSAGE) SubscribableChannel smsOutput(); @Output(EMAIL_MESSAGE) SubscribableChannel emailOutput(); }配置文件
约定大于配置,配置文件只修改端口和应用名称即可,其他配置一致。
spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
消息驱动微服务 A
发送消息
发送原始消息 10086|10086@email.com 至 source.message 交换机。
package com.example.producer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MyProcessor.class) public class SourceMessageProducer { private Logger logger = LoggerFactory.getLogger(SourceMessageProducer.class); @Autowired private MyProcessor myProcessor; /** * 发送原始消息 * * @param sourceMessage */ public void send(String sourceMessage) { logger.info("原始消息发送成功,原始消息为:{}", sourceMessage); myProcessor.sourceOutput().send(MessageBuilder.withPayload(sourceMessage).build()); } }接收消息
接收处理后的消息并发送短信和邮件。
package com.example.consumer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MyProcessor.class) public class SmsAndEmailMessageConsumer { private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageConsumer.class); /** * 接收消息 电话号码 * * @param phoneNum */ @StreamListener(MyProcessor.SMS_MESSAGE) public void receiveSms(String phoneNum) { logger.info("电话号码为:{},调用短信发送服务,发送短信...", phoneNum); } /** * 接收消息 邮箱地址 * * @param emailAddress */ @StreamListener(MyProcessor.EMAIL_MESSAGE) public void receiveEmail(String emailAddress) { logger.info("邮箱地址为:{},调用邮件发送服务,发送邮件...", emailAddress); } }消息驱动微服务 B
接收消息