Kafka源码解析(二)---Log分析 (2)

这两个方法主要的区别是,updateHighWatermark 方法,主要用在 Follower 副本从 Leader 副本获取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用来更新 Leader 副本的高水位值。

上面的方法中通过调用fetchHighWatermarkMetadata来获取高水位的值,我们下面看看这个方法:

fetchHighWatermarkMetadata

private def fetchHighWatermarkMetadata: LogOffsetMetadata = { // 读取时确保日志不能被关闭 checkIfMemoryMappedBufferClosed() val offsetMetadata = highWatermarkMetadata if (offsetMetadata.messageOffsetOnly) {//没有获得到完整的高水位元数据 lock.synchronized { // 通过读日志文件的方式把完整的高水位元数据信息拉出来 val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) updateHighWatermarkMetadata(fullOffset) fullOffset } } else { offsetMetadata } } private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { //通过给的offset,去日志文件中找到相应的日志信息 val fetchDataInfo = read(offset, maxLength = 1, isolation = FetchLogEnd, minOneMessage = false) fetchDataInfo.fetchOffsetMetadata }

然后我们提前看一下日志的read方法,是如何根据索引读取数据的:

日志段操作 日志读取操作

read

def read(startOffset: Long, maxLength: Int, isolation: FetchIsolation, minOneMessage: Boolean): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes") //convertToOffsetMetadataOrThrow传进来是FetchLogEnd,所以这里是false val includeAbortedTxns = isolation == FetchTxnCommitted // 由于没有使用锁,所以使用变量缓存当前的nextOffsetMetadata状态 val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset // 到日字段中根据索引寻找最近的日志段 var segmentEntry = segments.floorEntry(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset // 这里给出了几种异常场景: // 1. 给的日志索引大于最大值; // 2. 通过索引找的日志段为空; // 3. 给的日志索引小于logStartOffset if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments in the range $logStartOffset to $endOffset.") //convertToOffsetMetadataOrThrow传进来是FetchLogEnd,所以最大值是endOffsetMetadata // 查看一下读取隔离级别设置。 // 普通消费者能够看到[Log Start Offset, LEO)之间的消息 // 事务型消费者只能看到[Log Start Offset, Log Stable Offset]之间的消息。Log Stable Offset(LSO)是比LEO值小的位移值,为Kafka事务使用 // Follower副本消费者能够看到[Log Start Offset,高水位值]之间的消息 val maxOffsetMetadata = isolation match { case FetchLogEnd => endOffsetMetadata case FetchHighWatermark => fetchHighWatermarkMetadata case FetchTxnCommitted => fetchLastStableOffsetMetadata } //如果寻找的索引等于maxOffsetMetadata,那么直接返回 if (startOffset == maxOffsetMetadata.messageOffset) { return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) //如果寻找的索引大于maxOffsetMetadata,返回空的消息集合,因为没法读取任何消息 } else if (startOffset > maxOffsetMetadata.messageOffset) { val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset) return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns) } // 开始遍历日志段对象,直到读出东西来或者读到日志末尾 while (segmentEntry != null) { val segment = segmentEntry.getValue // 找到日志段中最大的日志位移 val maxPosition = { if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) { maxOffsetMetadata.relativePositionInSegment } else { segment.size } } // 根据位移信息从日志段中读取日志信息 val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) // 如果找不到日志信息,那么去日志段集合中找更大的日志位移的日志段 if (fetchInfo == null) { segmentEntry = segments.higherEntry(segmentEntry.getKey) } else { return if (includeAbortedTxns) addAbortedTransactions(startOffset, segmentEntry, fetchInfo) else fetchInfo } } //找了所有日志段的位移依然找不到,这可能是因为大于指定的日志位移的消息都被删除了,这种情况返回空 FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } }

read方法,有四个参数,分别是:

startOffset:读取的日志索引位置。

maxLength:读取数据量长度。

isolation:隔离级别,多用于 Kafka 事务。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywjyx.html