最后,要考虑重试带来的重复问题,也就是幂等性。由于 WAL 日志仅附加,在发生客户端通信失败和重试时,日志可能包含重复的条目。当读取日志条目时,可能会需要确保重复项被忽略。但是如果存储类似于 HashMap,其中对同一键的更新是幂等的,则不需要排重,但是可能会存在 ABA 更新问题。一般都需要实现某种机制来标记每个请求的唯一标识符并检测重复请求。
举例 各种 MQ 中的类似于 CommitLog 的日志MQ 中的消息存储,由于消息队列的特性导致消息存储和日志类似,所以一般用日志直接作为存储。这个消息存储一般就是 WAL 这种设计模式,以 RocketMQ 为例子:
RocketMQ:
RocketMQ 存储首先将消息存储在 Commitlog 文件之中,这个文件采用的是 mmap (文件映射内存)技术写入与保存。关于这个技术,请参考另一篇文章JDK核心JAVA源码解析(5) - JAVA File MMAP原理解析
当消息来时,写入文件的核心方法是MappedFile的appendMessagesInner方法:
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; //获取当前写入位置 int currentPos = this.wrotePosition.get(); //如果当前写入位置小于文件大小则尝试写入 if (currentPos < this.fileSize) { //mappedByteBuffer是公用的,在这里不能修改其position影响读取 //mappedByteBuffer是文件映射内存抽象出来的文件的内存ByteBuffer //对这个buffer的写入,就相当于对文件的写入 //所以通过slice方法生成一个共享原有相同内存的新byteBuffer,设置position //如果writeBuffer不为空,则证明启用了TransientStorePool,使用其中缓存的内存写入 ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); byteBuffer.position(currentPos); AppendMessageResult result; //分单条消息还有批量消息的情况 if (messageExt instanceof MessageExtBrokerInner) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } //增加写入大小 this.wrotePosition.addAndGet(result.getWroteBytes()); //更新最新消息保存时间 this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }RocketMQ 将消息存储在 Commitlog 文件后,异步更新 ConsumeQueue 还有 Index 文件。这个 ConsumeQueue 还有 Index 文件可以理解为存储状态,CommitLog 在这里扮演的就是 WAL 日志的角色:只有写入到 ConsumeQueue 的消息才会被消费者消费,只有 Index 文件中存在的记录才能被读取定位到。如果消息成功写入 CommitLog 但是异步更新还没执行,RocketMQ 进程挂掉了,这样就存在了不一致。所以在 RocketMQ 启动的时候,会通过如下机制保证 Commitlog 与 ConsumeQueue 还有 Index 的最终一致性.
入口是DefaultMessageStore的load方法:
public boolean load() { boolean result = true; try { //RocketMQ Broker启动时会创建${ROCKET_HOME}/store/abort文件,并添加JVM shutdownhook删除这个文件 //通过这个文件是否存判断是否为正常退出 boolean lastExitOK = !this.isTempFileExist(); log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally"); //加载延迟队列消息,这里先忽略 if (null != scheduleMessageService) { result = result && this.scheduleMessageService.load(); } //加载 Commit Log 文件 result = result && this.commitLog.load(); //加载 Consume Queue 文件 result = result && this.loadConsumeQueue(); if (result) { //加载存储检查点 this.storeCheckpoint = new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir())); //加载 index,如果不是正常退出,销毁所有索引上次刷盘时间小于索引文件最大消息时间戳的文件 this.indexService.load(lastExitOK); //进行 recover 恢复之前状态 this.recover(lastExitOK); log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset()); } } catch (Exception e) { log.error("load exception", e); result = false; } if (!result) { this.allocateMappedFileService.shutdown(); } return result; }