Kafka源码分析及图解原理之Producer端

  任何消息队列都是万变不离其宗都是3部分,消息生产者(Producer)、消息消费者(Consumer)和服务载体(在Kafka中用Broker指代)。那么本篇主要讲解Producer端,会有适当的图解帮助理解底层原理

 

Kafka源码分析及图解原理之Producer端

一.开发应用

  首先介绍一下开发应用,如何构建一个KafkaProducer及使用,还有一些重要参数的简介。

1.1 一个栗子

1 /** 2 * Kafka Producer Demo实例类。 3 * 4 * @author GrimMjx 5 */ 6 public class ProducerDemo { 7 public static void main(String[] args) throws ExecutionException, InterruptedException { 8 Properties prop = new Properties(); 9 prop.put("client.id", "DemoProducer"); 10 11 // 以下三个参数必须指定 12 // 用于创建与Kafka broker服务器的连接,集群的话则用逗号分隔 13 prop.put("bootstrap.servers", "localhost:9092"); 14 // 消息的key序列化方式 15 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 16 // 消息的value序列化方式 17 prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 18 19 // 以下参数为可配置选项 20 prop.put("acks", "-1"); 21 prop.put("retries", "3"); 22 prop.put("batch.size", "323840"); 23 prop.put("linger.ms", "10"); 24 prop.put("buffer.memory", "33554432"); 25 prop.put("max.block.ms", "3000"); 26 27 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); 28 try { 29 // 异步发送,继续发送消息不用等待。当有结果返回时,callback会被通知执行 30 producer.send(new ProducerRecord<String, String>("test", "key1", "value1"), 31 new Callback() { 32 // 返回结果RecordMetadata记录元数据包括了which partition的which offset 33 public void onCompletion(RecordMetadata metadata, Exception e) { 34 // 发送成功 35 if (e == null) { 36 System.out.println("The offset of the record we just sent is: " + metadata.offset()); 37 38 // 发送失败 39 } else { 40 if (e instanceof RetriableException) { 41 // 处理可重试的异常,比如分区leader副本不可用 42 // 一般用retries参数来设置重置,毕竟这里也没有什么其他能做的,也是同样的重试发送消息 43 } else { 44 // 处理不可重试异常 45 } 46 } 47 } 48 } 49 ); 50 51 // 同步发送,send方法返回Future,然后get。在没有返回结果一直阻塞 52 producer.send(new ProducerRecord<String, String>("test", "key1", "value1")).get(); 53 54 } finally { 55 // producer运行的时候占用系统额外资源,最后一定要关闭 56 producer.close(); 57 } 58 } 59 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspgdg.html