Spring Cloud 系列之 Stream 消息驱动(一) (7)

  接收原始消息 10086|10086@email.com 处理后并发送至 sms.message 和 email.message 交换机。

package com.example.consumer; import com.example.channel.MyProcessor; import com.example.producer.SmsAndEmailMessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MyProcessor.class) public class SourceMessageConsumer { private Logger logger = LoggerFactory.getLogger(SourceMessageConsumer.class); @Autowired private SmsAndEmailMessageProducer smsAndEmailMessageProducer; /** * 接收原始消息,处理后并发送 * * @param sourceMessage */ @StreamListener(MyProcessor.SOURCE_MESSAGE) public void receive(String sourceMessage) { logger.info("原始消息接收成功,原始消息为:{}", sourceMessage); // 发送消息 电话号码 smsAndEmailMessageProducer.sendSms(sourceMessage.split("\\|")[0]); // 发送消息 邮箱地址 smsAndEmailMessageProducer.sendEmail(sourceMessage.split("\\|")[1]); } }

  

发送消息

  

  发送电话号码 10086 和邮箱地址 10086@email.com 至 sms.message 和 email.message 交换机。

package com.example.producer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MyProcessor.class) public class SmsAndEmailMessageProducer { private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageProducer.class); @Autowired private MyProcessor myProcessor; /** * 发送消息 电话号码 * * @param smsMessage */ public void sendSms(String smsMessage) { logger.info("电话号码消息发送成功,消息为:{}", smsMessage); myProcessor.smsOutput().send(MessageBuilder.withPayload(smsMessage).build()); } /** * 发送消息 邮箱地址 * * @param emailMessage */ public void sendEmail(String emailMessage) { logger.info("邮箱地址消息发送成功,消息为:{}", emailMessage); myProcessor.emailOutput().send(MessageBuilder.withPayload(emailMessage).build()); } }

  

测试

  

单元测试

  

  MessageProducerTest.java

package com.example; import com.example.producer.SourceMessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private SourceMessageProducer sourceMessageProducer; @Test public void testSendSource() { sourceMessageProducer.send("10086|10086@email.com"); } }

  

访问

  

  消息驱动微服务 A 控制台打印结果如下:

电话号码为:10086,调用短信发送服务,发送短信... 邮箱地址为:10086@email.com,调用邮件发送服务,发送邮件...

  

  消息驱动微服务 B 控制台打印结果如下:

原始消息接收成功,原始消息为:10086|10086@email.com 电话号码消息发送成功,消息为:10086 邮箱地址消息发送成功,消息为:10086@email.com

  

  RabbitMQ 界面如下:

Spring Cloud 系列之 Stream 消息驱动(一)

下一篇我们讲解 Stream 如何实现消息分组和消息分区,记得关注噢~

Spring Cloud 系列之 Stream 消息驱动(一)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzgps.html