MQ 入门实践 (2)

MQ 入门实践

在 activemq.xml 添加节点 destinationInterceptors;

<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic prefix="consumer.*." selectorAware="false"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>

生产者正常往 testTopic 中发送消息,订阅者可修改订阅主题为类似 consumer.A.testTopic 这样来消费。

更多详细信息可参考:https://blog.csdn.net/java_collect/article/details/82154829

RocketMQ

是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

架构图示

MQ 入门实践

Name Server

名称服务器,类似于 Zookeeper 注册中心,提供 Broker 发现;

Broker

RocketMQ 的核心组件,绝大部分工作都在 Broker 中完成,接收请求,处理消费,消息持久化等;

Producer

消息生产方;

Consumer

消息消费方;

快速开始

安装后,依次启动 nameserver 和 broker,可以用 mqadmin 管理主题、集群和 broker 等信息;

https://segmentfault.com/a/1190000017841402

添加依赖;

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>

消息发送;

DefaultMQProducer producer = new DefaultMQProducer("producer-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); Message msg = new Message( "producer-topic", "msg", "hello world".getBytes() ); //msg.setDelayTimeLevel(1); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); producer.shutdown();

delayLevel 从 1 开始默认依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

消息接收;

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("producer-topic", "msg"); consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { for (MessageExt msg : list) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start();

.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876

Spring Boot 集成

添加依赖;

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>

添加 yaml 配置;

rocketmq: name-server: 127.0.0.1:9876 producer: group: producer

发送消息;

@Autowired private RocketMQTemplate mqTemplate; public void sendMessage(String topic, String tag, String message) { SendResult result = mqTemplate.syncSend(topic + ":" + tag, message); System.out.println(JSON.toJSONString(result)); }

接收消息;

@Component @RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test") public class MsgListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println(message); } } Console 控制台

RocketMQ 拓展包提供了管理控制台;

https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

MQ 入门实践

重复消费

产生原因:

生产者重复投递;

消息队列异常;

消费者异常消费;

怎么解决重复消费的问题,换句话怎么保证消息消费的幂等性

通常基于本地消息表的方案实现,消息处理过便不再处理。

顺序消息

消息错乱的原因:

一个消息队列 queue,多个 consumer 消费;

一个 queue 对应一个 consumer,但是 consumer 多线程消费;

要保证消息的顺序消费,有三个关键点:

消息顺序发送

消息顺序存储

消息顺序消费

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwdgsw.html