Kafka源码解析(二)---Log分析 (3)

minOneMessage:是否至少返回一条消息。设想如果消息很大,超过了 maxLength,正常情况下 read 方法永远不会返回任何消息。但如果设置了该参数为 true,read 方法就保证至少能够返回一条消息。

代码中使用了segments,来根据位移查找日志段:

private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

我们下面看看read方法具体做了哪些事:

由于没有使用锁,所以使用变量缓存当前的nextOffsetMetadata状态,作为最大索引LEO;

去日志段集合里寻找小于或等于指定索引的日志段;

校验异常情况:

startOffset是不是超过了LEO;

是不是日志段集合里没有索引小于startOffset;

startOffset小于Log Start Offset;

接下来获取一下隔离级别;

如果寻找的索引等于LEO,那么返回空;

如果寻找的索引大于LEO,返回空的消息集合,因为没法读取任何消息;

开始遍历日志段对象,直到读出东西来或者读到日志末尾;

首先找到日志段中最大的位置;

根据位移信息从日志段中读取日志信息(这个read方法我们上一篇已经讲解过了);

如果找不到日志信息,那么读取日志段集合中下一个日志段;

找了所有日志段的位移依然找不到,这可能是因为大于指定的日志位移的消息都被删除了,这种情况返回空;

我们在上面的read操作中可以看到,使用了segments来查找日志。我们主要看看删除操作

删除日志

删除日志的入口是:deleteOldSegments

// 如果topic deletion开关是打开的,那么会删去过期的日志段以及超过设置保留日志大小的日志 // 无论是否开启删除规则,都会删除在log start offset之前的日志段 def deleteOldSegments(): Int = { if (config.delete) { deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } }

deleteOldSegments方法会判断是否开启删除规则,如果开启,那么会分别调用:

deleteRetentionMsBreachedSegments删除segment的时间戳超过了设置时间的日志段;

deleteRetentionSizeBreachedSegments删除日志段空间超过设置空间大小的日志段;

deleteLogStartOffsetBreachedSegments删除日志段的baseOffset小于logStartOffset的日志段;

我这里列举一下这三个方法主要是怎么实现的:

private def deleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs < 0) return 0 val startMs = time.milliseconds //调用deleteOldSegments方法,并传入匿名函数,判断当前的segment的时间戳是否超过了设置时间 deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs, reason = s"retention time ${config.retentionMs}ms breach") } private def deleteRetentionSizeBreachedSegments(): Int = { if (config.retentionSize < 0 || size < config.retentionSize) return 0 var diff = size - config.retentionSize //判断日志段空间是否超过设置空间大小 //shouldDelete函数会将传入的日志段去减diff,直到小于等于零 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = { if (diff - segment.size >= 0) { diff -= segment.size true } else { false } } deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach") } private def deleteLogStartOffsetBreachedSegments(): Int = { //shouldDelete函数主要判断日志段的baseOffset是否小于logStartOffset def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = nextSegmentOpt.exists(_.baseOffset <= logStartOffset) deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") }

这种写代码的方式非常的灵活,通过不同方法设置不同的函数来实现代码复用的目的,最后都是通过调用deleteOldSegments来实现删除日志段的目的。

下面我们来看一下deleteOldSegments的操作:

deleteOldSegments

这个deleteOldSegments方法和上面的入口方法传入的参数是不一致的,这个方法传入了一个predicate函数,用于判断哪些日志段是可以被删除的,reason用来说明被删除的原因。

private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { //删除任何匹配到predicate规则的日志段 lock synchronized { val deletable = deletableSegments(predicate) if (deletable.nonEmpty) info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason") deleteSegments(deletable) } }

这个方法调用了两个主要的方法,一个是deletableSegments,用于获取可以被删除的日志段的集合;deleteSegments用于删除日志段。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywjyx.html