在 activemq.xml 添加节点 destinationInterceptors;
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic prefix="consumer.*." selectorAware="false"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>生产者正常往 testTopic 中发送消息,订阅者可修改订阅主题为类似 consumer.A.testTopic 这样来消费。
更多详细信息可参考:https://blog.csdn.net/java_collect/article/details/82154829
RocketMQ是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
架构图示
Name Server
名称服务器,类似于 Zookeeper 注册中心,提供 Broker 发现;
Broker
RocketMQ 的核心组件,绝大部分工作都在 Broker 中完成,接收请求,处理消费,消息持久化等;
Producer
消息生产方;
Consumer
消息消费方;
快速开始安装后,依次启动 nameserver 和 broker,可以用 mqadmin 管理主题、集群和 broker 等信息;
https://segmentfault.com/a/1190000017841402
添加依赖;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>消息发送;
DefaultMQProducer producer = new DefaultMQProducer("producer-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); Message msg = new Message( "producer-topic", "msg", "hello world".getBytes() ); //msg.setDelayTimeLevel(1); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); producer.shutdown();delayLevel 从 1 开始默认依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。
消息接收;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("producer-topic", "msg"); consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { for (MessageExt msg : list) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start();.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876
Spring Boot 集成添加依赖;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>添加 yaml 配置;
rocketmq: name-server: 127.0.0.1:9876 producer: group: producer发送消息;
@Autowired private RocketMQTemplate mqTemplate; public void sendMessage(String topic, String tag, String message) { SendResult result = mqTemplate.syncSend(topic + ":" + tag, message); System.out.println(JSON.toJSONString(result)); }接收消息;
@Component @RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test") public class MsgListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println(message); } } Console 控制台RocketMQ 拓展包提供了管理控制台;
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
重复消费产生原因:
生产者重复投递;
消息队列异常;
消费者异常消费;
怎么解决重复消费的问题,换句话怎么保证消息消费的幂等性。
通常基于本地消息表的方案实现,消息处理过便不再处理。
顺序消息消息错乱的原因:
一个消息队列 queue,多个 consumer 消费;
一个 queue 对应一个 consumer,但是 consumer 多线程消费;
要保证消息的顺序消费,有三个关键点:
消息顺序发送
消息顺序存储
消息顺序消费