Kafka系列2:深入理解Kafka消费者

Kafka系列2:深入理解Kafka消费者

上篇聊了Kafka概况,包含了Kafka的基本概念、设计原理,以及设计核心。本篇单独聊聊Kafka的消费者,包括如下内容:

生产者是如何生产消息

如何创建生产者

发送消息到Kafka

生产者配置

分区

生产者是如何生产消息的

首先来看一下Kafka生产者组件图

Kafka系列2:深入理解Kafka消费者

(生产者组件图。图片来源:《Kafka权威指南》)

第一步,Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。第二步,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。

如何创建生产者 属性设置

在创建生产者对象的时候,要设置一些属性,有三个属性是必选的:

bootstrap.servers:指定Broker的地址清单,地址格式为host:port。清单里不需要包含所有的Broker地址,生产者会从给定的Broker里查找到其他Broker的信息;不过建议至少要提供两个Broker的信息保证容错。

key.serializer:指定键的序列化器。Broker希望接收到的消息的键和值都是字节数组。这个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer、StringSerializer和IntegerSerializer,因此一般不需要实现自定义的序列化器。需要注意的是,key.serializer属性是必须设置的,即使只发送值内容。

value.serializer:指定值的序列化器。如果键和值都是字符串,可以使用与key.serializer一样的序列化器,否则需要使用不同的序列化器。

项目依赖

以maven项目为例,要使用Kafka客户端,需要引入kafka-clients依赖:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzjfy.html