scala
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties) myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()) stream = env .addSource(myConsumer) .print() Kafka ProducerKafka Producer 根据版本分别叫做FlinkProducer011 FlinkKafkaProducer010等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer 。
java
DataStream<String> stream = ...; FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>( "localhost:9092", // broker list "my-topic", // target topic new SimpleStringSchema()); // serialization schema // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka; // this method is not available for earlier Kafka versions myProducer.setWriteTimestampToKafka(true); stream.addSink(myProducer);scala
val stream: DataStream[String] = ... val myProducer = new FlinkKafkaProducer011[String]( "localhost:9092", // broker list "my-topic", // target topic new SimpleStringSchema) // serialization schema // versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka; // this method is not available for earlier Kafka versions myProducer.setWriteTimestampToKafka(true) stream.addSink(myProducer)需要指定broker list , topic,序列化类。
自定义分区:默认情况下,将使用FlinkFixedPartitioner将每个Flink Kafka Producer并行子任务映射到单个Kafka分区。
可以实现FlinkKafkaPartitioner类自定义分区。
Flink1.9消费Kafka完整代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaConsumer { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); //构建FlinkKafkaConsumer FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); //指定偏移量 myConsumer.setStartFromEarliest(); DataStream<String> stream = env .addSource(myConsumer); env.enableCheckpointing(5000); stream.print(); env.execute("Flink Streaming Java API Skeleton"); }项目地址:https://github.com/tree1123/flink_demo_1.9
更多Flink知识,欢迎关注实时流式计算