Spring Cloud 系列之 Stream 消息驱动(一) (2)

  

组成 说明
Middleware   中间件,支持 RabbitMQ 和 Kafka。  
Binder   目标绑定器,目标指的是 Kafka 还是 RabbitMQ。绑定器就是封装了目标中间件的包。如果操作的是 Kafka 就使用 spring-cloud-stream-binder-kafka,如果操作的是 RabbitMQ 就使用 spring-cloud-stream-binder-rabbit。  
@Input   注解标识输入通道,接收(消息消费者)的消息将通过该通道进入应用程序。  
@Output   注解标识输出通道,发布(消息生产者)的消息将通过该通道离开应用程序。  
@StreamListener   监听队列,消费者的队列的消息接收。  
@EnableBinding   注解标识绑定,将信道 channel 和交换机 exchange 绑定在一起。  

  

工作原理

  

  通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的 Binder 绑定器而不需要修改任何应用逻辑。

  

Spring Cloud 系列之 Stream 消息驱动(一)

  

  该模型图中有如下几个核心概念:

Source:当需要发送消息时,我们就需要通过 Source.java,它会把我们所要发送的消息进行序列化(默认转换成 JSON 格式字符串),然后将这些数据发送到 Channel 中;

Sink:当我们需要监听消息时就需要通过 Sink.java,它负责从消息通道中获取消息,并将消息反序列化成消息对象,然后交给具体的消息监听处理;

Channel:通常我们向消息中间件发送消息或者监听消息时需要指定主题(Topic)和消息队列名称,一旦我们需要变更主题的时候就需要修改消息发送或消息监听的代码。通过 Channel 对象,我们的业务代码只需要对应 Channel 就可以了,具体这个 Channel 对应的是哪个主题,可以在配置文件中来指定,这样当主题变更的时候我们就不用对代码做任何修改,从而实现了与具体消息中间件的解耦;

Binder:通过不同的 Binder 可以实现与不同的消息中间件整合,Binder 提供统一的消息收发接口,从而使得我们可以根据实际需要部署不同的消息中间件,或者根据实际生产中所部署的消息中间件来调整我们的配置。

  

环境准备

  

  stream-demo 聚合工程。SpringBoot 2.2.4.RELEASE、Spring Cloud Hoxton.SR1。

RabbitMQ:消息队列

eureka-server:注册中心

eureka-server02:注册中心

  

Spring Cloud 系列之 Stream 消息驱动(一)

  

入门案例

  

  点击链接观看:Stream 入门案例视频(获取更多请关注公众号「哈喽沃德先生」)

  

消息生产者

  

创建项目

  

  在 stream-demo 项目下创建 stream-producer 子项目。

  

添加依赖

  

  要使用 RabbitMQ 绑定器,可以通过使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>

  或者使用 Spring Cloud Stream RabbitMQ Starter:

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

  

  完整依赖如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 "> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>stream-producer</artifactId> <version>1.0-SNAPSHOT</version> <!-- 继承父依赖 --> <parent> <groupId>com.example</groupId> <artifactId>stream-demo</artifactId> <version>1.0-SNAPSHOT</version> </parent> <!-- 项目依赖 --> <dependencies> <!-- netflix eureka client 依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!-- spring cloud stream binder rabbit 绑定器依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <!-- spring boot test 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>

  

配置文件

  

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzgps.html