超详细kafka教程来啦 (4)

配置

#kafka spring.kafka.bootstrap-servers=192.168.157.200:9092 # 发生错误后,消息重发的次数 spring.kafka.producer.retries=0 spring.kafka.producer.batch-size=16384 # 设置生产者内存缓冲区的大小。 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.acks=1 #消费者 #自动提交的时间间隔 spring.kafka.consumer.auto-commit-interval=1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 在侦听器容器中运行的线程数。 spring.kafka.listener.concurrency=5 #listner负责ack,每调用一次,就立即commit spring.kafka.listener.ack-mode=manual_immediate spring.kafka.listener.missing-topics-fatal=false

producer

@Component public class MyKafkaProducer { @Autowired private KafkaTemplate<String,Object> kafkaTemplate; public void send(String topic,Object object){ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable ex) { System.out.println("发送消息失败"+ex.getMessage()); } @Override public void onSuccess(SendResult<String, Object> result) { System.out.println("发送消息成功"+result); } }); }

consumer

@Component public class MyKafkaConsumer { @KafkaListener(topics = "test1",groupId = "group_test") public void consumer(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){ Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); System.out.println("group_test 消费了: Topic:" + topic + ",Message:" + msg); ack.acknowledge(); } } @KafkaListener(topics = "test1",groupId = "group_test2") public void consumer2(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){ Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); System.out.println("group_test2 消费了: Topic:" + topic + ",Message:" + msg); ack.acknowledge(); } } }

测试

超详细kafka教程来啦

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwjzs.html