新建三个子模块分别对应于消息的生产者和消费者:
模块名 微服务功能cloud-stream-rabbitmq-provider8801 生产者,发送消息模块
cloud-stream-rabbitmq-consumer8802 消费者,接收消息模块
cloud-stream-rabbitmq-consumer8803 消费者,接收消息模块
2.1 消息驱动之消息生产者
新建Module:cloud-stream-rabbitmq-provider8801作为消息的生产者用来发送消息,在其POM文件中除引入web、actuator、eureka-client等必要启动器外,还需要引入SpringCloud Stream对应实现RabbitMQ的启动器依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>编写其配置文件application.yml:
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息 defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: mpolaris.top port: 5672 username: admin password: 1234321 bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称,OUTPUT表示这是消息的发送方 # 表示要使用的Exchange名称定义 destination: testExchange # 设置消息类型,本次为json,文本则设置“text/plain” content-type: application/json # 设置要绑定的消息服务的具体设置 default-binder: defaultRabbit eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: :7001/eureka instance: # 设置心跳的时间间隔(默认是30秒) lease-renewal-interval-in-seconds: 2 # 如果现在超过了5秒的间隔(默认是90秒) lease-expiration-duration-in-seconds: 5 # 在信息列表时显示主机名称yml instance-id: send-8801.com # 访问的路径变为IP地址 prefer-ip-address: true编写其主启动类
编写业务类,在业务类中分别要编写 发送消息接口 及其 实现类,并在发送接口消息的实现类中 添加 @EnableBinding 注解 用来绑定消息的推送管道,消息生产者绑定的消息推送管道为 org.springframework.cloud.stream.messaging.Source:
public interface IMessageProvider { public String send(); } import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import javax.annotation.Resource; import java.util.UUID; /** * @Author polaris * @Date 2021/3/4 21:46 */ @EnableBinding(Source.class) //定义消息的推送管道 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; //消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); //发送消息 System.out.println("==> serial:" + serial); return null; } }注意我们在service的实现类中不再需要@Service注解,因为这个service不再是传统意义上的和Controller、DAO数据等进行交互的service,而是要绑定绑定器打交道的service。
然后编写其业务层的Controller:
@RestController public class SendMessageController { @Autowired private IMessageProvider messageProvider; @GetMapping("/sendMessage") public String sendMessage() { return messageProvider.send(); } }启动服务注册中心后和RabbitMQ后,启动消息生产者微服务,我们在RabbitMQ的控制面板中可以看见多出了一个名为testExchange的交换机,这个交换机恰恰就是我们之前在配置文件中配置的交换机名字testExchange。
然后我们访问 :8801/sendMessage 使用消息生产者微服务发送消息,在其微服务后台我们看到了打印的消息。
在RabbitMQ的控制面板中我们也看到了确实发送了消息。
2.2 消息驱动之消息消费者新建Module:cloud-stream-rabbitmq-consumer8802/8803作为消息的生产者用来接收消息,其POM文件中引入的启动器依赖和消息生产者微服务的依赖几乎相同,然后编写其配置文件application.yml,其配置文件的书写和消息生产者的几乎一致,特别需要注意的是,消息生产者微服务用到的通道为OUTPUT,而消息消费者微服务用到的通道为INPUT,其他的配置文件信息就只需要注意端口号、注册服务名的区别即可:
spring: cloud: bindings: input: # 这个名字是一个通道的名称,INPUT表示消息消费者编写主启动类
编写消息消费者的业务类,由于是消费者,所以只需要编写其Controller即可,在其Controller上同样需要添加 @EnableBinding 注解用来绑定消息的推送管道,消息消费者绑定的消息推送管道为import org.springframework.cloud.stream.messaging.Sink,在接收消息的方法中需要使用 @StreamListner 注解来监听其绑定的消息推送管道:
@Component @EnableBinding(Sink.class) public class ReceiveMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消费者" + serverPort + "号,收到消息:" + message.getPayload()); } }然后启动消息发送消费者服务,用生产者发送消息,我们可以发现在消费者端可以成功接收到消息。
3. 分组消费和持久化 3.1 重复消费问题当生产者发送消息后,此时的我们的消费者都接受了消息并进行了消费,也就是说同一条消息被多个消息消费者所消费。