KafkaIO读操作
pipeline.apply(KafkaIO.<Long, String>read() .withBootstrapServers("broker_1:9092,broker_2:9092")// .withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics. .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class) // Above four are required configuration. returns PCollection<KafkaRecord<Long, String>> // Rest of the settings are optional : // you can further customize KafkaConsumer used to read the records by adding more // settings for ConsumerConfig. e.g : .updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1")) // set event times and watermark based on 'LogAppendTime'. To provide a custom // policy see withTimestampPolicyFactory(). withProcessingTime() is the default. // Use withCreateTime() with topics that have 'CreateTime' timestamps. .withLogAppendTime() // restrict reader to committed messages on Kafka (see method documentation). .withReadCommitted() // offset consumed by the pipeline can be committed back. .commitOffsetsInFinalize() // finally, if you don't need Kafka metadata, you can drop it.g .withoutMetadata() // PCollection<KV<Long, String>> ) .apply(Values.<String>create()) // PCollection<String>1) 指定KafkaIO的模型,从源码中不难看出这个地方的KafkaIO<K,V>类型是Long和String 类型,也可以换成其他类型。
pipeline.apply(KafkaIO.<Long, String>read() pipeline.apply(KafkaIO.<Long, String>read()2) 设置Kafka集群的集群地址。
.withBootstrapServers("broker_1:9092,broker_2:9092")3) 设置Kafka的主题类型,源码中使用了单个主题类型,如果是多个主题类型则用withTopics(List<String>)方法进行设置。设置情况基本跟Kafka原生是一样的。
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.4) 设置序列化类型。Apache Beam KafkaIO 在序列化的时候做了很大的简化,例如原生Kafka可能要通过Properties 类去设置 ,还要加上很长一段jar包的名字。
Beam KafkaIO的写法:
.withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class)原生Kafka的设置:
Properties props = new Properties(); props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");5) 设置Kafka的消费者属性,这个地方还可以设置其他的属性。源码中是针对消费分组进行设置。
.updateConsumerProperties(ImmutableMap.of("group.id", my_beam_app_1"))6) 设置Kafka吞吐量的时间戳,可以是默认的,也可以自定义。
.withLogAppendTime()