RocketMQ部署的时候一个broker set会有一个mater和一个或者多个slave,salve起到的作用就是同步master存储的的消息,并且会接收部分consumer读取消息的请求,下面围绕两个问题来阐明怎么做的冗余:
怎么实现冗余
冗余之后的消息读取
怎么实现冗余?RocketMQ通过主从结构来实现消息冗余,master接收来自producer发送来的消息,然后同步消息到slave,根据master的role不同,同步的时机可分为两种不同的情况:
SYNC_MASTER:如果master是这种角色,每次master在将producer发送来的消息写入内存(磁盘)的时候会同步等待master将消息传输到slave
ASYNC_MASTER:这种角色下消息会异步复制到slave
这里注意的是master传输到slave只有CommitLog的物理文件。
master和slave之间传输CommitLog的主要流程如下:
这里主要涉及到两个class(包括其内部类):HAService、HAConnection(这两个类的源码中文注释可以在这里找到)。
broker在启动的时候会调用DefaultMessageStore.start方法,这里面会调用HAService.start来启动相关的服务:
AcceptSocketService:启动serverSocket并监听来自HAClient的连接
GroupTransferService:broker写消息的时候如果需要同步等待消息同步到slave,会用到这个服务
HAClient:如果是slave,才会启动haClient。
master和slave之间的数据通信过程是:
master启动之后会监听来自slave的连接,slave启动之后会主动连接到master。
在连接建立之后,slave会向master上报自己的本地的CommitLog的offset
master根据slave的offset来决定从那里开始向slave发送数据
slave发送给master的数据格式:
offset(8字节)
offset:slave本地CommitLog的maxOffset
master发送给slave的数据格式:
header(offset(8字节) + bodySize(4字节)) + body
offset:由于master发送给slave的CommitLog的单位是MappedFile的个数,这个offset是MappedFile的起始位置
bodySize:MappedFile的大小
body:MappedFile的内容
前面说过SYNC_MASTER和ASYNC_MASTER传输数据给slave的过程稍有不同,下面先看看ASYNC_MASTER怎么传输数据到slave的。
SYNC_MASTER同步数据到slavesalve连接到master,向master上报slave当前的offset
master收到后确认给slave发送数据的开始位置
master查询开始位置对应的MappedFIle
master将查找到的数据发送给slave
slave收到数据后保存到自己的CommitLog
// org.apache.rocketmq.store.ha.HAService.HAClient#run public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 只有配置了HAMaster地址的broker才会连接到master if (this.connectMaster()) { if (this.isTimeToReportOffset()) { // 向master上报slave本地最大的CommitLog的offset boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } } this.selector.select(1000); // 处理socket上的read事件,也就是处理master发来的数据 boolean ok = this.processReadEvent(); // 省略中间代码... } // master收到slave上报的offset后用下面的方法处理 // org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent private boolean processReadEvent() { int readSizeZeroTimes = 0; if (!this.byteBufferRead.hasRemaining()) { this.byteBufferRead.flip(); this.processPostion = 0; } while (this.byteBufferRead.hasRemaining()) { try { int readSize = this.socketChannel.read(this.byteBufferRead); if (readSize > 0) { readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); if ((this.byteBufferRead.position() - this.processPostion) >= 8) { int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); long readOffset = this.byteBufferRead.getLong(pos - 8); this.processPostion = pos; // slave上报过来的offset说明offset之前的数据slave都已经收到 HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { // 如果是刚刚和slave建立连接,需要知道slave需要从哪里开始接收commitLog HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } // 如果收到来自slave的确认之后,唤醒等待同步到slave的线程(如果是SYNC_MASTER) HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); // 省略中间代码... }