RocketMq灰皮书(三)------MQ使用

RocketMq灰皮书(三)------MQ使用

在使用MQ之前,我们回顾一下前两篇博文的内容.

我们大致了解了RocketMQ的四个概念,分别是:Producer,Consumer,Message和Broker

我们在本地的Windows10系统上,部署了RocketMQ和其后台系统

在本篇博文中,我们会使用使用SpringBoot构建两个微服务,一个作为生产者,一个作为消费者,通过RocketMQ传递消息,了解在Java中使用RocketMQ的方法.

一. SpringBoot整合RocketMQ收发消息

在灰皮书第一篇文章中,我画了下面这个图:

image-20210209115705604

现在我们本地的RocketMQ也部署起来了,接下来我们创建两个微服务通过MQ来收发消息,实现基本的流程.

1. 微服务构建

首先我们创建两个基于SpringBoot的微服务,分别是:

rocketmq-consumer消息消费者

rocketmq-producer消息生产者

image-20210218093723919

两个服务里面,rocketmq-consumer的端口号是2001,rocketmq-producer的端口号是2002

2. 微服务启动测试

分别在两个微服务写两个测试方法,启动测试:

rocketmq-consumer

@RestController public class ConsumerController { @GetMapping("/consumer") public String index() { return "rocketmq-consumer"; } }

rocketmq-producer

@RestController public class ProducerController { @GetMapping("/producer") public String index() { return "rocketmq-producer"; } }

启动测试,两个接口都成功访问.

根据我们最上面的图,服务A发送消息到服务B,在这里,我们用rocketmq-producer来发送消息,消息发送到rocketmq以后,由服务Brocketmq-consumer消费消息.

3. 生产者发送消息

使用rocketmq发送消息有很多种方式,因为我们使用的是SpringBoot,这里直接使用官方提供的rocketmq-spring-boot-starter包来开发

在github上有个项目:RocketMQ-Spring

它就是RocketMq官方提供的整合了SpringBoot的rocketmq工具包,git地址如下:https://github.com/apache/rocketmq-spring

当然,你也可以使用原生的rocketmq-client包,在官方的示例中,使用的就是这种方式,具体可以查看官方文档,下面我们直接使用rocketmq-spring-boot-starter来发送消息.

我们可以看到有很多的版本可以用:

image-20210218100739239

这里我们使用2.0.3这个版本吧,具体的官方细节可以查看https://github.com/apache/rocketmq-spring/blob/release-2.0.3/README_zh_CN.md

3.1发送String消息

首先是pom坐标:

<!--add dependency in pom.xml--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.3</version> </dependency>

然后再rocketmq-producer的配置文件中配置rocketmq的name-server和group

## application.properties rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=producer

rocketmq-spring-boot-starter中提供了一个RocketMQTemplate来方便我们发送消息,我们可以直接注入这个类来使用.

RocketMQTemplate有send方法和convertAndSend方法,都可以用来发送消息,区别是,前者的方法入参是rocketmq规定的Message类型,而后者可以发送对象,并且帮我们转换,源码如下:

/** * Send a message to the given destination. * @param destination the target destination * @param message the message to send */ void send(D destination, Message<?> message) throws MessagingException; /** * Convert the given Object to serialized form, possibly using a * {@link org.springframework.messaging.converter.MessageConverter}, * wrap it as a message and send it to a default destination. * @param payload the Object to use as payload */ void convertAndSend(Object payload) throws MessagingException;

下面我们直接发送消息到mq

@Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/producer") public String index() { rocketMQTemplate.convertAndSend("test-topic", "消息发送成功!"); return "rocketmq-producer"; }

convertAndSend方法有两个参数,第一个参数是消息要发送到的topic,也就是目的地,第二个参数就是消息本身,至于topic到底是什么,这个我们后面详细来说,我们只需要知道,我们的消息发送到了rocketmq的一个叫做test-topic的地方即可.

并且,由于我们在灰皮书第二章的时候,启动mq的时候,指定了autoCreateTopicEnable=true,也就是说,我们使用RocketMQTemplate发送的消息,就算topic之前不存在,rocket也会帮我们创建好.

编码完成,重启项目,我们只要访问:2002/producer就会发送消息到mq,我们可以通过rocketmq-console查看我们发送的消息

可以看到mq自动为我们创建了topic:

image-20210218113616038

在message页签,可以查看到我们刚才发送的消息:

image-20210218113722908

详细的消息内容:

image-20210218113741161

3.2发送对象

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyffy.html