Kafka源码解析(二)---Log分析 (5)

analyzeAndValidateRecords

private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset: Option[Long] = None var lastOffset = -1L var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients // 消息格式Version 2的消息批次,起始位移值必须从0开始 if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0) throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}") // update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower // case, validation will be more lenient. // Also indicate whether we have the accurate first offset or not if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) // 更新firstOffset字段 lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段 readFirstMessage = true } // check that offsets are monotonically increasing // 一旦出现当前lastOffset不小于下一个batch的lastOffset,说明上一个batch中有消息的位移值大于后面batch的消息 // 这违反了位移值单调递增性 if (lastOffset >= batch.lastOffset) monotonic = false // update the last offset seen // 使用当前batch最后一条消息的位移值去更新lastOffset lastOffset = batch.lastOffset // Check if the message sizes are valid. val batchSize = batch.sizeInBytes // 检查消息批次总字节数大小是否超限,即是否大于Broker端参数max.message.bytes值 if (batchSize > config.maxMessageSize) { brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + s"which exceeds the maximum configured value of ${config.maxMessageSize}.") } // check the validity of the message by checking CRC // 执行消息批次校验,包括格式是否正确以及CRC校验 if (!batch.isValid) { brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark() throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.") } // 更新maxTimestamp字段和offsetOfMaxTimestamp if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp offsetOfMaxTimestamp = lastOffset } // 累加消息批次计数器以及有效字节数,更新shallowMessageCount字段 shallowMessageCount += 1 validBytesCount += batchSize // 从消息批次中获取压缩器类型 val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec } // Apply broker-side compression if any // 获取Broker端设置的压缩器类型,即Broker端参数compression.type值。 // 该参数默认值是producer,表示sourceCodec用的什么压缩器,targetCodec就用什么 val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) // 最后生成LogAppendInfo对象并返回 LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywjyx.html