如何系统的了解Kafka (2)

send(key,value,topic):在这里,默认的HashPartitioner用于确定要写入消息的分区,方式查找key的Hash并进行取模,该Topic的分区,也可以编写我们自己定义的分区程序;

send(null,value,topic):在这种情况下,消息以循环方式存储在所有分区中。

Java生产者示例代码如下:

public class JProducer extends Thread { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); long counter = 1L; while (true) { String json = "{\"id\":" + (counter++) + ",\"date\":\"" + new Date().toString() + "\"}"; String k = "key" + counter; producer.send(new ProducerRecord<String, String>("test01", k, json), (recordMetadata, e) -> { if (e == null) { System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset()); } else { e.printStackTrace(); } }); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // producer.close(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsgpw.html