RocketMQ(九):消息发送(续)

RocketMQ(九):消息发送(续)

NameServer:在系统中是做命名服务,更新和发现 broker服务。

Broker-Master:broker 消息主机服务器。

Broker-Slave: broker 消息从机服务器。

Producer: 消息生产者。

Consumer: 消息消费者。

说明: rocketmq系列都将会以rocketmq-4.1.0-incubating进行介绍。

在阅读源码时做了一定的注释,公众号【匠心零度】回复:rocketmq,可获得基于rocketmq4.1.0加详细中文代码注释 。欢迎大家 star、fork !

上篇RocketMQ(八):消息发送主要分析了下一般发送流程,本篇将会介绍下定时发送,顺序发送,批量发送等情况 。

消息发送概述

RocketMQ(九):消息发送(续)

上面的图大概就是producer发送message到broker的核心逻辑了。

备注:本篇将会介绍下定时发送,顺序发送,批量发送情况 ,这些情况说明都是站在producer角度进行说明,涉及到broker的内容会在分析broker相关内容的时候进行分析。

定时发送 何为定时消息

Producer 将消息发送到 MQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费。

调用形式

与普通的调用发送基本没有什么区别,唯一就是多了一个消息设置setDelayTimeLevel即可。代码调用如图:

RocketMQ(九):消息发送(续)

那么这里的级别应该填写什么呢?对应什么呢?--->这块后续会分析,今天这里简单说明下:

RocketMQ(九):消息发送(续)

其他的就和普通发送没有任何区别了,关键处理在broker,后续分析。

顺序发送 何为顺序发送

参考:https://help.aliyun.com/document_detail/49319.html?spm=a2c4g.11186623.2.3.21aKRd

RocketMQ(九):消息发送(续)

调用形式

RocketMQ(九):消息发送(续)

我们来看看具体内容实现调用就明白怎么回事了:

RocketMQ(九):消息发送(续)

获取到所有可发送的信息

发送内容

用来区分(比如订单号,这一类的订单号是有顺序的,但是和其他订单号可以无序)

sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);

这里mqs就是获取到所有可发送的信息,而orderId就是用来区分的,每次拿这个orderId和所有的进行取模(如果所有的没有变化的情况下面)那么一种都是固定的那个队列了(队列就是先进先出了),后续消费在分析消费如何处理这块。

备注:这里的顺序也是相对的,如果添加了broker或者broker减少了,那么取模的信息势必会不一致,所以需要明白下。

批量发送 何为批量发送

批量这个概念我相信大家一定都非常熟悉了,在很多调优的时候,比如数据库批量处理,有些请求进行合并发送等都是类似实现,那么rocketmq批量发送也是为了追求性能,特别在数量量级特别大的时候,批量效果就非常明显了。

备注:既然是批量就是等一批之后发送,那么实时性一定可能就稍微有点点延迟了(可以忽略,特别数据量多的情况)。我一般处理批量的就2个维度,达到一个就可以触发,1.达到给定条数 2.达到给定时间(比如3s一次,如果这个时候数据量还是不够也会进行发送)……

rocketmq只是提供接口,发送一批数据,那么何时发送这一批数据就是根据自己的选择,就是上面说的两条就是一般的做法:

达到给定条数 。

达到给定时间。

调用形式

rocketmq这里有一个好处,就是在批量发送的时候,会做一个简单的转换,减少网络传输,学习下:

RocketMQ(九):消息发送(续)

把这些转化为a、b、c等形式。

我们来看看rocketmq给我们的批量发送的例子,为什么会给我们2个例子呢?需要思考!!!,下面我会来为大家讲解下:

备注: 批量发送需要相同的topic以及相同的waitStoreMsgOK 和不支持定时发送。

Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support。

RocketMQ(九):消息发送(续)

这种发送就是批量(这里的批量不是特别大),就自己发送走了。

RocketMQ(九):消息发送(续)

这里的批量(批量的数据太多),所以进行分割之后发送了。最后调用的方法还是一样的。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsyyw.html