进行恢复是DefaultMessageStore的recover方法:
private void recover(final boolean lastExitOK) { long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue(); //根据上次是否正常退出,采用不同的恢复方式 if (lastExitOK) { this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue); } else { this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue); } this.recoverTopicQueueTable(); }当上次正常退出时:
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { //只扫描最后三个文件 int index = mappedFiles.size() - 3; if (index < 0) index = 0; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { //检验存储消息是否有效 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); //如果有效,添加这个偏移 if (dispatchRequest.isSuccess() && size > 0) { mappedFileOffset += size; } //如果有效,但是大小是0,代表到了文件末尾,切换文件 else if (dispatchRequest.isSuccess() && size == 0) { index++; if (index >= mappedFiles.size()) { // Current branch can not happen log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next physics file, " + mappedFile.getFileName()); } } //只有有无效的消息,就在这里停止,之后会丢弃掉这个消息之后的所有内容 else if (!dispatchRequest.isSuccess()) { log.info("recover physics file end, " + mappedFile.getFileName()); break; } } processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); //根据有效偏移量,删除这个偏移量以后的所有文件,以及所有文件(正常是只有最后一个有效文件,而不是所有文件)中大于这个偏移量的部分 this.mappedFileQueue.truncateDirtyFiles(processOffset); //根据 commit log 中的有效偏移量,清理 consume queue if (maxPhyOffsetOfConsumeQueue >= processOffset) { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } else { //所有commit log都删除了,那么偏移量就从0开始 log.warn("The commitlog files are deleted, and delete the consume queue files"); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); } }当上次没有正常退出时:
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { // 从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件 // 从这个文件开始恢复消息,因为里面的消息有成功写入过 consumer queue 以及 index 的,所以从这里恢复一定能保证最终一致性 // 但是会造成某些已经写入过 consumer queue 的消息再次写入,也就是重复消费。 int index = mappedFiles.size() - 1; MappedFile mappedFile = null; for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); //寻找第一个有正常消息的文件 if (this.isMappedFileMatchedRecover(mappedFile)) { log.info("recover from this mapped file " + mappedFile.getFileName()); break; } } //如果小于0,就恢复所有 commit log,或者代表没有 commit log if (index < 0) { index = 0; mappedFile = mappedFiles.get(index); } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { //验证消息有效性 DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); //如果消息有效 if (dispatchRequest.isSuccess()) { if (size > 0) { mappedFileOffset += size; if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { //如果允许消息重复转发,则需要判断当前消息是否消息偏移小于已确认的偏移,只有小于的进行重新分发 if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { //重新分发消息,也就是更新 consume queue 和 index this.defaultMessageStore.doDispatch(dispatchRequest); } } else { //重新分发消息,也就是更新 consume queue 和 index this.defaultMessageStore.doDispatch(dispatchRequest); } } //大小为0代表已经读完,切换下一个文件 else if (size == 0) { index++; if (index >= mappedFiles.size()) { // The current branch under normal circumstances should // not happen log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next physics file, " + mappedFile.getFileName()); } } } else { log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position()); break; } } //更新偏移 processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); //清理 if (maxPhyOffsetOfConsumeQueue >= processOffset) { log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset); this.defaultMessageStore.truncateDirtyLogicFiles(processOffset); } } // Commitlog case files are deleted else { log.warn("The commitlog files are deleted, and delete the consume queue files"); this.mappedFileQueue.setFlushedWhere(0); this.mappedFileQueue.setCommittedWhere(0); this.defaultMessageStore.destroyLogics(); } }总结起来就是:
首先,根据 abort 文件是否存在判断上次是否正常退出。
对于正常退出的:
扫描倒数三个文件,记录有效消息的偏移
扫描到某个无效消息结束,或者扫描完整个文件
设置最新偏移,同时根据这个偏移量清理 commit log 和 consume queue
对于没有正常退出的:
从最后一个文件开始,向前寻找第一个正常的可以恢复消息的文件