我们把上面的程序稍作修改,只把订阅的Topic和发送消息时消息的Topic改为broker-a-topic即可。代码在这里就不给大家重复写了,重启一下程序,发送消息看看日志吧。
生产者端的日志如下:
i=0 BrokerName:broker-a i=1 BrokerName:broker-a i=2 BrokerName:broker-a i=3 BrokerName:broker-a i=4 BrokerName:broker-a我们看到5个消息都发送到了broker-a中,再来看看消费端的日志,
this is simpleMQ,my NO is 0---Wed Jun 10 14:00:28 CST 2020 this is simpleMQ,my NO is 2---Wed Jun 10 14:00:29 CST 2020 this is simpleMQ,my NO is 3---Wed Jun 10 14:00:29 CST 2020 this is simpleMQ,my NO is 4---Wed Jun 10 14:00:29 CST 2020 this is simpleMQ,my NO is 1---Wed Jun 10 14:00:29 CST 2020消费的顺序还是乱的,这是怎么回事?消息都在broker-a中了,为什么消费时顺序还是乱的?程序有问题吗?review了好几遍没有发现问题。
问题排查问题卡在这个地方,卡了好长时间,最后在官网的示例中发现,它在发送消息时,使用了一个MessageQueueSelector,我们也实现一下试试吧,改造一下发送端的程序,如下:
SendResult sendResult = defaultMQProducer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return mqs.get(0); } },i);在发送的方法中,我们实现了MessageQueueSelector接口中的select方法,这个方法有3个参数,mq的集合,发送的消息msg,和我们传入的参数,这个参数就是最后的那个变量i,大家不要漏了。这个select方法需要返回的是MessageQueue,也就是mqs变量中的一个,那么mqs中有多少个MessageQueue呢?我们猜测是2个,因为我们只有broker-a和broker-b,到底是不是呢?我们打断点看一下,
MessageQueue有8个,并且brokerName都是broker-a,原来Broker和MessageQueue不是相同的概念,之前我们都理解错了。我们可以用下面的方式理解,
集群 --------》 Broker ------------》 MessageQueue
一个RocketMQ集群里可以有多个Broker,一个Broker里可以有多个MessageQueue,默认是8个。
那现在对于顺序消费,就有了正确的理解了,顺序消费是只在一个MessageQueue内,顺序消费,我们验证一下吧,先看看发送端的日志,
i=0 BrokerName:broker-a i=1 BrokerName:broker-a i=2 BrokerName:broker-a i=3 BrokerName:broker-a i=4 BrokerName:broker-a5个消息都发送到了broker-a中,通过前面的改造程序,这5个消息应该都是在MessageQueue-0当中,再来看看消费端的日志,
this is simpleMQ,my NO is 0---Wed Jun 10 14:21:40 CST 2020 this is simpleMQ,my NO is 1---Wed Jun 10 14:21:41 CST 2020 this is simpleMQ,my NO is 2---Wed Jun 10 14:21:41 CST 2020 this is simpleMQ,my NO is 3---Wed Jun 10 14:21:41 CST 2020 this is simpleMQ,my NO is 4---Wed Jun 10 14:21:41 CST 2020这回是顺序消费了,每一个消费者都是等前面的消息消费完以后,才去消费下一个消息,这就完全解释的通了,我们再把消费端改成并发消费看看,如下:
@Bean(name = "pushConsumerOrderly", initMethod = "start",destroyMethod = "shutdown") public DefaultMQPushConsumer pushConsumerOrderly() throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pushConsumerOrderly"); consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;"); consumer.subscribe("broker-a-topic","*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { Random random = new Random(); try { Thread.sleep(random.nextInt(5) * 1000); } catch (InterruptedException e) { e.printStackTrace(); } for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); return consumer; }这回使用的是并发消费,我们再看看结果,
i=0 BrokerName:broker-a i=1 BrokerName:broker-a i=2 BrokerName:broker-a i=3 BrokerName:broker-a i=4 BrokerName:broker-a5个消息都在broker-a中,并且知道它们都在同一个MessageQueue中,再看看消费端,
this is simpleMQ,my NO is 1---Wed Jun 10 14:28:00 CST 2020 this is simpleMQ,my NO is 0---Wed Jun 10 14:28:00 CST 2020 this is simpleMQ,my NO is 3---Wed Jun 10 14:28:00 CST 2020 this is simpleMQ,my NO is 2---Wed Jun 10 14:28:00 CST 2020 this is simpleMQ,my NO is 4---Wed Jun 10 14:28:00 CST 2020是乱序的,说明消费者是并发的消费这些消息的,即使它们在同一个MessageQueue中。
总结好了,到这里终于把顺序消费搞明白了,其中的关键就是Broker中还有多个MessageQueue,同一个MessageQueue中的消息才能顺序消费。