KafkaProducer 发送消息主要有以下 3 种方式:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value"); // 发送并忘记(fire-and-forget) producer.send(record); // 同步发送 Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); // 异步发送 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { } }); producer.close();具体的发送流程可以参考 KafkaProducer发送流程简析。
KafkaProducer 是线程安全的,多个线程可以共享同一个 KafkaProducer 对象。
配置解析 client.id该参数可以是任意的字符串,broker 会用它来识别消息的来源,会在日志和监控指标里展示。
bootstrap.servers 该属性指定 broker 的地址列表。
清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。
不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
这两个属性必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类。
生产者会使用这个类把键值对象序列化成字节数组。
设置 socket 读写数据时用到的 TCP 缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。
当生产者或消费者与 broker 处于不同的机房时,可以适当增大这些值。
设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。
如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
此时KafkaProducer.send()会阻塞等待内存释放,等待时间超过 max.block.ms 后会抛出超时异常。
该参数指定了消息被发送给 broker 之前,使用哪一种压缩算法(snappy,gzip或lz4)进行压缩。
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
该参数指定了一个批次可以使用的内存字节数(而不是消息个数)。
消息批次ProducerBatch包含了一组将要发送至同个分区的消息,当批次被填满,批次里的所有消息会被立即发送出去。
不过生产者并不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也可能被发送。
所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。
但如果设置得太小,生产者会频繁地发送消息,会增加一些额外的网络开销。
该参数指定了生产者在发送批次之前等待的时间。
生产者会在批次填满或等待时间达到 linger.ms 时把批次发送出去。
设置linger.ms>0会增加延迟,但也会提升吞吐量(一次性发送更多的消息,每个消息的开销就变小了)。
参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
这个参数决定令消息丢失的可能性:
acks=0 生产者发出消息后不等待来自服务器的响应
如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。
不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
acks=1 只要集群的 leader 节点收到消息,生产者就会收到一个来自服务器的成功响应
如果消息无法到达 leader 节点(比如:leader节点崩溃,新的 leader 还没有被选举出来),生产者会收到一个错误响应。
为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新 leader,消息还是会丢失。
这个时候的吞吐量取决于使用的是同步发送还是异步发送:
发送端阻塞等待服务器的响应(通过调用 Future.get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)
发送端使用回调可以缓解延迟问题,不过吞吐量仍受在途消息数量的限制(比如:生产者在收到服务器响应之前可以发送多少个消息)