在PullConsumer中,有关消息的拉取RocketMQ提供了很多API,但总的来说分为两种,同步消息拉取和异步消息拉取
同步消息拉取
以同步方式拉取消息都是通过DefaultMQPullConsumerImpl的pullSyncImpl方法:
1 private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, 2 long timeout) 3 throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 4 this.makeSureStateOK(); 5 6 if (null == mq) { 7 throw new MQClientException("mq is null", null); 8 } 9 10 if (offset < 0) { 11 throw new MQClientException("offset < 0", null); 12 } 13 14 if (maxNums <= 0) { 15 throw new MQClientException("maxNums <= 0", null); 16 } 17 18 this.subscriptionAutomatically(mq.getTopic()); 19 20 int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); 21 22 long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; 23 24 boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); 25 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( 26 mq, 27 subscriptionData.getSubString(), 28 subscriptionData.getExpressionType(), 29 isTagType ? 0L : subscriptionData.getSubVersion(), 30 offset, 31 maxNums, 32 sysFlag, 33 0, 34 this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), 35 timeoutMillis, 36 CommunicationMode.SYNC, 37 null 38 ); 39 this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); 40 if (!this.consumeMessageHookList.isEmpty()) { 41 ConsumeMessageContext consumeMessageContext = null; 42 consumeMessageContext = new ConsumeMessageContext(); 43 consumeMessageContext.setConsumerGroup(this.groupName()); 44 consumeMessageContext.setMq(mq); 45 consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); 46 consumeMessageContext.setSuccess(false); 47 this.executeHookBefore(consumeMessageContext); 48 consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); 49 consumeMessageContext.setSuccess(true); 50 this.executeHookAfter(consumeMessageContext); 51 } 52 return pullResult; 53 }