Kafka源码解析(二)---Log分析

一般来说Log 的常见操作分为 4 大部分。

高水位管理操作

日志段管理

关键位移值管理

读写操作

其中关键位移值管理主要包含Log Start Offset 和 LEO等。

高水位HighWatermark 高水位HighWatermark初始化

高水位是通过LogOffsetMetadata类来定义的:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

这里传入的初始值是logStartOffset,表明当首次构建高水位时,它会被赋值成 Log Start Offset 值。

我们再来看看LogOffsetMetadata类:

case class LogOffsetMetadata(messageOffset: Long, segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) { // check if this offset is already on an older segment compared with the given offset def onOlderSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly) throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset < that.segmentBaseOffset } ... }

LogOffsetMetadata有三个初始值:

messageOffset表示消息位移值;

segmentBaseOffset保存消息位移值所在日志段的起始位移,用来判断两条消息是否处于同一个日志段的;

relativePositionSegment保存消息位移值所在日志段的物理磁盘位置;

上面的onOlderSegment表明,要比较哪个日志段更老,只需要比较segmentBaseOffset的大小就可以了。

高水位HighWatermark设值与更新 private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = { //高水位的值不可能小于零 if (newHighWatermark.messageOffset < 0) throw new IllegalArgumentException("High watermark offset should be non-negative") lock synchronized {// 保护Log对象修改的Monitor锁 highWatermarkMetadata = newHighWatermark// 赋值新的高水位值 //事务相关,暂时忽略 producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) //事务相关,暂时忽略 maybeIncrementFirstUnstableOffset() } trace(s"Setting high watermark $newHighWatermark") }

设置高水位的值是很简单的,首先校验高水位的值是否大于零,然后通过直接加锁之后更新高水位的值。

更新更新高水位值的方法有两个:updateHighWatermark 和 maybeIncrementHighWatermark,我们分别分析。

updateHighWatermark

def updateHighWatermark(hw: Long): Long = { //传入的高水位的值如果小于logStartOffset,设置为logStartOffset val newHighWatermark = if (hw < logStartOffset) logStartOffset // 传入的高水位的值如果大于LEO,那么设置为LEO else if (hw > logEndOffset) logEndOffset else hw //将newHighWatermark封装成一个LogOffsetMetadata然后更新高水位的值 updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark)) //返回新的高水位的值 newHighWatermark }

这个方法逻辑也很简洁,因为高水位的值是不可能大于LEO,也不可能小于logStartOffset,所以需要对传入的hw校验然后设置成正确的值,然后调用上面的设置高水位的方法设值。

maybeIncrementHighWatermark

/** * Update the high watermark to a new value if and only if it is larger than the old value. It is * an error to update to a value which is larger than the log end offset. * * This method is intended to be used by the leader to update the high watermark after follower * fetch offsets have been updated. * * @return the old high watermark, if updated by the new value */ // 当新的高水位的值大于旧的高水位的值时才做更新,如果新的高水位的值大于LEO,会报错 // 这个方法是leader在确认Follower已经拉取了日志之后才做更新 def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = { //如果新的高水位的值大于LEO,会报错 if (newHighWatermark.messageOffset > logEndOffset) throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " + s"log end offset $logEndOffsetMetadata") lock.synchronized { // 获取老的高水位值 val oldHighWatermark = fetchHighWatermarkMetadata // Ensure that the high watermark increases monotonically. We also update the high watermark when the new // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. //只有当新的高水位值大于老的值,因为要维护高水位的单调递增性 //或者当新的高水位值和老的高水位值相等,但是新的高水位在一个新的日志段上面时才做更新 if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { updateHighWatermarkMetadata(newHighWatermark) Some(oldHighWatermark)// 返回老的高水位值 } else { None } } }

这个方法我将这个方法的英文注释贴出来了,这个注释的说明我也写到方法上了,逻辑很清楚,大家看看注释应该能理解。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywjyx.html