kafka知识体系50-生产者编程实践

linux集群安装过程请参考。
window安装过程如下:

下载zookeeper安装包(zookeeper-3.4.6),解压到D:\Program\zookeeper,并设置环境变量

添加系统变量ZOOKEEPER_HOME=D:\Program\zookeeper,并在path后面添加:%ZOOKEEPER_HOME%\bin

将zoo_sample.cfg重命名为zoo.cfg,修改内容如下:

tickTime=4000 initLimit=10 syncLimit=5 dataDir=D:/Program/zookeeper/data clientPort=2181 maxClientCnxns=60 server.1=localhost:2888:3888

D:/Program/zookeeper/data目录下新建文件myid,并用文本软件打开,填入数字1

下载kafka安装包(kafka_2.11-0.10.1.0),解压到D:\Program\kafka,并设置环境变量

添加系统变量KAFKA_HOME=D:\Program\kafka,并在path后面添加:%KAFKA_HOME%\bin

修改D:\Program\kafka\config\server.properties配置文件如下:

broker.id=0 advertised.listeners=PLAINTEXT://LAPTOP-2CBRDCI0:9092 advertised.port=9092 advertised.host.name=LAPTOP-2CBRDCI0 log.dirs=D:/Program/kafka/data/kafka-logs zookeeper.connect=localhost:2181/kafka zookeeper.connection.timeout.ms=60000

启动zookeeper
双击脚本D:\Program\zookeeper\bin\zkServer.cmd

启动kafka

命令行运行 D:\Program\kafka\bin\windows\kafka-server-start.bat D:/Program/kafka/config/server.properties kafka创建topic D:\Program\kafka\bin\windows\kafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --create --topic TEST1 --replication-factor 1 --partitions 3 D:\Program\kafka\bin\windows\kafka-topics.bat --zookeeper LAPTOP-2CBRDCI0:2181/kafka --describe --topic TEST 实践 依赖

kafka 0.10.1.0版本中采用KafkaProducer对象用来向kafka broker集群发送消息。
编写代码前先引入相关依赖包:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency> 基本配置和发送流程

KafkaProducer是线程安全的,即可以跨线程共享单个KafkaProducer实例,我们先看单线程发送消息的示例,以了解kafka发送消息的流程。

package com.molyeo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by zhangkh on 2018/7/11. */ public class SinglekafkaProducerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.flush(); producer.close(); } }

我们先构建一个props实例,用于保存kafka配置。
acks
acks配置项表示消息的确认机制。
acks=0表示生者不会等服务端确认,消息被立即添加到socket buffer中,并认为已经发送。这种情况下由于客户端不知道消息是否真实发送成功,配置项中的重试次数项retries也不会生效(即不会重试),每条消息返回的offset值均为-1。
acks=1表示消息的leader分区收到消息后则被视为消息已发送成功,不会等待副本分区确认。如果leader分区收到消息后,然后所在节点立即宕机,follower分区还来不同步,则消息丢失。
acks=all或者acks=-1,表示消息的leader和follower分区均已收到后才被视为消息已成功发送。这是最严格的确认机制,只要至少min.insync.replicas还活着,则消息不会丢失。

retries
如果网络原因或者其他异常导致发送请求失败,生产端可以根据参数retries进行重试。

batch.size
生产者为每个分区维护未发送消息的缓冲区,缓冲区的大小及batch.size,默认配置为16384,即16KB

linger.ms
逗留时间,默认为0,即使缓冲区有其他未使用的空间,也可以立即发送。
如果我们希望减少服务端的压力,则可以延迟一定时间,待消息量比较大时批量发送。
简单点说,只要满足batch.size和linger.ms中的一个条件,生产者发送线程则会发送请求,具体的要分析org.apache.kafka.clients.producer.internals.Sender类。

buffer.memory
生产者总的消息缓冲区,超过该大小,阻塞max.block.ms。

生产者其他配置项可参考

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydzxz.html