kafka知识体系50-生产者编程实践 (2)

重点说一下KafkaProducer的send方法

public Future<RecordMetadata> send(ProducerRecord<K, V> record) public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

这两个send方法均是异步发送,一旦将记录存储在待发送的缓冲区中,均立即返回,这允许并行发送许多记录而不会阻塞,以便在每个记录之后等待响应。
下面的send(ProducerRecord<K, V> record, Callback callback)方法提供了当消息发送成功时的回调,返回的结果RecordMetadata指定记录发送到的分区,分配的偏移量和记录的时间戳。
如果想阻塞同步发送,可以调用Future的get方法:

byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) producer.send(record).get();

完全异步发送,则采用Callback参数来提供请求完成用的回调:

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); 多线程并发发送

为充分利用kafka的高吞吐量,生产端可以采用多线程并发发送消息,前文已提到过KafkaProducer是线程安全的,即可以跨线程共享单个KafkaProducer实例。
kafka实际配置类KafkaCommonConfig:

package com.molyeo.kafka; /** * Created by zhangkh on 2018/7/10. */ public class KafkaCommonConfig { public static String BOOTSTRAP_SERVERS="LAPTOP-2CBRDCI0:9092"; public static String ACKS="all"; public static int RETRIES=0; public static int BATCH_SIZE=16384; public static int LINGER_MS=1; public static int BUFFER_MEMORY=33554432; public static String KEY_SERIALIZER_CLASS="org.apache.kafka.common.serialization.StringSerializer"; public static String VALUE_SERIALIZER_CLASS= "org.apache.kafka.common.serialization.StringSerializer"; }

消息发送

package com.molyeo.kafka; import org.apache.kafka.clients.producer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by zhangkh on 2018/7/5. */ public class MultiKafkaProducerDemo { private static final int PRODUCER_THREAD_NUM = 5; public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(PRODUCER_THREAD_NUM); Producer<String, String> producer = new KafkaProducer<String, String>(getProducerConfig()); String topic = "TEST"; try { for (int i = 0; i < 20; i++) { Thread.sleep(20); String key = Integer.toString(i); String value = Long.toString(System.currentTimeMillis()); ProducerRecord<String, String> record = new ProducerRecord<>(topic,i%3, key, value); executorService.submit(new CommonProducerThread<>(producer, record)); } } catch (Exception e) { e.printStackTrace(); } try { //Block for a while Thread.sleep(60 * 1000L); } catch (InterruptedException e) { e.printStackTrace(); } finally { producer.flush(); producer.close(); executorService.shutdown(); } } public static Properties getProducerConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaCommonConfig.BOOTSTRAP_SERVERS); props.put(ProducerConfig.ACKS_CONFIG, KafkaCommonConfig.ACKS); props.put(ProducerConfig.RETRIES_CONFIG, KafkaCommonConfig.RETRIES); props.put(ProducerConfig.BATCH_SIZE_CONFIG, KafkaCommonConfig.BATCH_SIZE); props.put(ProducerConfig.LINGER_MS_CONFIG, KafkaCommonConfig.LINGER_MS); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, KafkaCommonConfig.BUFFER_MEMORY); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.KEY_SERIALIZER_CLASS); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaCommonConfig.VALUE_SERIALIZER_CLASS); return props; } } class CommonProducerThread<K, V> implements Runnable { Logger logger = LoggerFactory.getLogger(CommonProducerThread.class.getSimpleName()); private final Producer producer; private final ProducerRecord<K, V> record; public CommonProducerThread(Producer producer, ProducerRecord record) { this.producer = producer; this.record = record; } @Override public void run() { logger.info("prepare to send msg:thread name={},key={},value={}", Thread.currentThread().getName(), record.key(), record.value()); producer.send(record, new ProducerAckCallback(System.currentTimeMillis(), record.key(), record.value())); } } class ProducerAckCallback<K, V> implements Callback { Logger logger = LoggerFactory.getLogger(ProducerAckCallback.class.getSimpleName()); private final long startTime; private final K key; private final V value; public ProducerAckCallback(long startTime, K key, V value) { this.startTime = startTime; this.key = key; this.value = value; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { logger.info("send success:key {},value {}, sent to partition {},offset {} in {} ms", key, value, metadata.partition(), metadata.offset(), elapsedTime); } else { exception.printStackTrace(); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zydzxz.html