接收原始消息 10086|10086@email.com 处理后并发送至 sms.message 和 email.message 交换机。
package com.example.consumer; import com.example.channel.MyProcessor; import com.example.producer.SmsAndEmailMessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MyProcessor.class) public class SourceMessageConsumer { private Logger logger = LoggerFactory.getLogger(SourceMessageConsumer.class); @Autowired private SmsAndEmailMessageProducer smsAndEmailMessageProducer; /** * 接收原始消息,处理后并发送 * * @param sourceMessage */ @StreamListener(MyProcessor.SOURCE_MESSAGE) public void receive(String sourceMessage) { logger.info("原始消息接收成功,原始消息为:{}", sourceMessage); // 发送消息 电话号码 smsAndEmailMessageProducer.sendSms(sourceMessage.split("\\|")[0]); // 发送消息 邮箱地址 smsAndEmailMessageProducer.sendEmail(sourceMessage.split("\\|")[1]); } }发送消息
发送电话号码 10086 和邮箱地址 10086@email.com 至 sms.message 和 email.message 交换机。
package com.example.producer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MyProcessor.class) public class SmsAndEmailMessageProducer { private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageProducer.class); @Autowired private MyProcessor myProcessor; /** * 发送消息 电话号码 * * @param smsMessage */ public void sendSms(String smsMessage) { logger.info("电话号码消息发送成功,消息为:{}", smsMessage); myProcessor.smsOutput().send(MessageBuilder.withPayload(smsMessage).build()); } /** * 发送消息 邮箱地址 * * @param emailMessage */ public void sendEmail(String emailMessage) { logger.info("邮箱地址消息发送成功,消息为:{}", emailMessage); myProcessor.emailOutput().send(MessageBuilder.withPayload(emailMessage).build()); } }测试
单元测试
MessageProducerTest.java
package com.example; import com.example.producer.SourceMessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private SourceMessageProducer sourceMessageProducer; @Test public void testSendSource() { sourceMessageProducer.send("10086|10086@email.com"); } }访问
消息驱动微服务 A 控制台打印结果如下:
电话号码为:10086,调用短信发送服务,发送短信... 邮箱地址为:10086@email.com,调用邮件发送服务,发送邮件...
消息驱动微服务 B 控制台打印结果如下:
原始消息接收成功,原始消息为:10086|10086@email.com 电话号码消息发送成功,消息为:10086 邮箱地址消息发送成功,消息为:10086@email.com
RabbitMQ 界面如下:
下一篇我们讲解 Stream 如何实现消息分组和消息分区,记得关注噢~