同样,Consumer 的 mian 方法,也非常简单。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();执行后会看到一堆消息被成功消费。
... ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1239, sysFlag=0, bornTimestamp=1619580615742, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615742, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F5246, commitLogOffset=1004102, bodyCRC=493758879, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83E03B8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 53, 50], transactionId='null'}]] ConsumeMessageThread_8 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1238, sysFlag=0, bornTimestamp=1619580615740, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615740, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4F1A, commitLogOffset=1003290, bodyCRC=1688269248, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83C03B4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 56], transactionId='null'}]] ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1237, sysFlag=0, bornTimestamp=1619580615737, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615737, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4BEE, commitLogOffset=1002478, bodyCRC=1830206955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83903B0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 52], transactionId='null'}]] ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1236, sysFlag=0, bornTimestamp=1619580615735, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615735, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F48C2, commitLogOffset=1001666, bodyCRC=1786477042, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83703AC, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 48], transactionId='null'}]] ConsumeMessageThread_18 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1235, sysFlag=0, bornTimestamp=1619580615733, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615733, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F4596, commitLogOffset=1000854, bodyCRC=1280920064, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83503A8, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 54], transactionId='null'}]] ConsumeMessageThread_16 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1234, sysFlag=0, bornTimestamp=1619580615731, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615731, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F426A, commitLogOffset=1000042, bodyCRC=1261735449, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83303A4, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 50], transactionId='null'}]] ConsumeMessageThread_10 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1233, sysFlag=0, bornTimestamp=1619580615729, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615729, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3F3E, commitLogOffset=999230, bodyCRC=855266886, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A83103A0, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 50, 56], transactionId='null'}]] ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1232, sysFlag=0, bornTimestamp=1619580615727, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615728, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F3C12, commitLogOffset=998418, bodyCRC=994843245, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82F039C, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 50, 52], transactionId='null'}]] ConsumeMessageThread_11 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1231, sysFlag=0, bornTimestamp=1619580615725, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615726, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F38E6, commitLogOffset=997606, bodyCRC=1008852596, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82D0398, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 50, 48], transactionId='null'}]] ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=DESKTOP-Q6M76VI, queueId=3, storeSize=203, queueOffset=1230, sysFlag=0, bornTimestamp=1619580615723, bornHost=/10.232.74.132:55094, storeTimestamp=1619580615724, storeHost=/10.232.74.132:10911, msgId=0AE84A8400002A9F00000000000F35BA, commitLogOffset=996794, bodyCRC=2121214082, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1251, CONSUME_START_TIME=1619580616221, UNIQ_KEY=7F000001162C18B4AAC28D83A82B0394, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 49, 54], transactionId='null'}]] ...一个生产,一个消费,清晰明了,体验很好。
嗯,至此为止,quick-start 就基本通了,也就知道这玩意的最简单的用法了。
看看整体架构
此时已经上手体验了一把用法,来总结下。
1. 我启动了一个 namesrv,暴露的端口是 9876。
2. 我又启动了一个 broker,启动时加了个参数 -n localhost:9876,很明显,就是指向了刚刚那个 namesrv。
3. 然后我启动 produer 发了一堆消息。
4. 最后我启动了 consumer,就神奇地收到了这个消息。
启动 producer 和 consumer 时都必须使用一个环境变量叫 NAMESRV_ADDR=localhost:9876