消费者
public class SimpleKafkaConsumer { public static void main(String[] args) { Properties properties=new Properties(); properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("bootstrap.servers","192.168.157.200:9092"); //消费者组 properties.setProperty("group.id","group1"); KafkaConsumer consumer=new KafkaConsumer(properties); //订阅topic consumer.subscribe(Arrays.asList("test1")); while (true){ //拉取数据 ConsumerRecords poll=consumer.poll(100); ((ConsumerRecords) poll).forEach( data->{ System.out.println(((ConsumerRecord)data).value()); } ); } } } 三、高级特性 生产者特性生产者-确认模式
acks=0 :只发送不管有没有写入到broker
acks=1:只写入到leader就认为成功
acks=-1/all:要求ISR列表里所有follower都同步过去,才算成功
将acks设置为-1就一定能保证消息不丢吗?
答:不是的。如果partition只有一个副本,也就是光有leader没有follower,那么宕机了消息一样会丢失。所以至少也要设置2个及以上的副本才行。
另外,要提高数据可靠性,设置acks=-1的同时,也要设置min.insync.replicas(最小副本数,默认1)
生产者-同步发送
public void syncSend() throws ExecutionException, InterruptedException { Properties properties=new Properties(); properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("bootstrap.servers","192.168.157.200:9092"); KafkaProducer producer=new KafkaProducer(properties); ProducerRecord record=new ProducerRecord("test1","这是一条消息"); Future future = producer.send(record); //同步发送消息方法1 Object o = future.get(); //同步发送消息方法2 producer.send(record); producer.flush(); producer.close(); }生产者-异步发送
public void asyncSend(){ Properties properties=new Properties(); properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("bootstrap.servers","192.168.157.200:9092"); //生产者在发送批次之前等待更多消息加入批次的时间 properties.setProperty("linger.ms","1"); properties.setProperty("batch.size","20240"); KafkaProducer producer=new KafkaProducer(properties); ProducerRecord record=new ProducerRecord("test1","这是一条消息"); //异步发送方法1 producer.send(record); //异步发送方法2 producer.send(record,((metadata, exception) -> { if(exception==null){ System.out.println("record="+record.value()); } })); }生产者-顺序保证
同步请求发送+broker只能一个请求一个请求的接
public void sequenceGuarantee(){ Properties properties=new Properties(); properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("bootstrap.servers","192.168.157.200:9092"); //生产者在收到服务器响应之前可以发送多少个消息,保证一个一个的发 properties.setProperty("max.in.flight.requests.per.connection","1"); KafkaProducer producer=new KafkaProducer(properties); ProducerRecord record=new ProducerRecord("test1","这是一条消息"); //同步发送 producer.send(record); producer.flush(); producer.close(); }生产者-消息可靠性传递
事务+幂等
这里的事务就是,发送100条消息,如果其中报错了,那么所有的消息都不能被消费者读取。
public static void transaction(){ Properties properties=new Properties(); properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //重试次数 properties.setProperty("retries","3"); properties.setProperty("bootstrap.servers","192.168.157.200:9092"); //生产者发送消息幂等,此时会默认把acks设置为all properties.setProperty("enable.idempotence","true"); //事务id properties.setProperty("transactional.id","tx0001"); ProducerRecord record=new ProducerRecord("test1","这是一条消息"); KafkaProducer producer=new KafkaProducer(properties); try { producer.initTransactions(); producer.beginTransaction(); for (int i = 0; i < 100; i++) { producer.send(record,(recordMetadata, e) -> { if(e!=null){ producer.abortTransaction(); throw new KafkaException("send error"+e.getMessage()); } }); } producer.commitTransaction(); } catch (ProducerFencedException e) { producer.abortTransaction(); e.printStackTrace(); } producer.close(); } 消费者特性消费者-消费者组