Flink1.9整合Kafka (3)

scala

val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties) myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) stream = env .addSource(myConsumer) .print() Kafka Producer

Kafka Producer 根据版本分别叫做FlinkProducer011 FlinkKafkaProducer010等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer 。

构建FlinkKafkaConsumer

java

DataStream<String> stream = ...; FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>( "localhost:9092", // broker list "my-topic", // target topic new SimpleStringSchema()); // serialization schema // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka; // this method is not available for earlier Kafka versions myProducer.setWriteTimestampToKafka(true); stream.addSink(myProducer);

scala

val stream: DataStream[String] = ... val myProducer = new FlinkKafkaProducer011[String]( "localhost:9092", // broker list "my-topic", // target topic new SimpleStringSchema) // serialization schema // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka; // this method is not available for earlier Kafka versions myProducer.setWriteTimestampToKafka(true) stream.addSink(myProducer)

需要指定broker list , topic,序列化类。

自定义分区:默认情况下,将使用FlinkFixedPartitioner将每个Flink Kafka Producer并行子任务映射到单个Kafka分区。

可以实现FlinkKafkaPartitioner类自定义分区。

Flink1.9消费Kafka完整代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaConsumer { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); //构建FlinkKafkaConsumer FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); //指定偏移量 myConsumer.setStartFromEarliest(); DataStream<String> stream = env .addSource(myConsumer); env.enableCheckpointing(5000); stream.print(); env.execute("Flink Streaming Java API Skeleton"); }

项目地址:https://github.com/tree1123/flink_demo_1.9

更多Flink知识,欢迎关注实时流式计算

file

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyfj.html