scala:
val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) .print()必须有的:
1.topic名称
2.用于反序列化Kafka数据的DeserializationSchema / KafkaDeserializationSchema
3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)
配置消费起始位置java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...); myConsumer.setStartFromEarliest(); // start from the earliest record possible myConsumer.setStartFromLatest(); // start from the latest record myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds) myConsumer.setStartFromGroupOffsets(); // the default behaviour //指定位置 //Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); //specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); //myConsumer.setStartFromSpecificOffsets(specificStartOffsets); DataStream<String> stream = env.addSource(myConsumer);scala:
val env = StreamExecutionEnvironment.getExecutionEnvironment() val myConsumer = new FlinkKafkaConsumer[String](...) myConsumer.setStartFromEarliest() // start from the earliest record possible myConsumer.setStartFromLatest() // start from the latest record myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds) myConsumer.setStartFromGroupOffsets() // the default behaviour //指定位置 //val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]() //specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) //myConsumer.setStartFromSpecificOffsets(specificStartOffsets) val stream = env.addSource(myConsumer) 检查点启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用Kafka的记录。
如果禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。
如果启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。
java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // checkpoint every 5000 msecsscala
val env = StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 msecs 分区发现Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次性保证消费它们。
还可以使用正则:
java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>( java.util.regex.Pattern.compile("test-topic-[0-9]"), new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); ...scala
val env = StreamExecutionEnvironment.getExecutionEnvironment() val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val myConsumer = new FlinkKafkaConsumer08[String]( java.util.regex.Pattern.compile("test-topic-[0-9]"), new SimpleStringSchema, properties) val stream = env.addSource(myConsumer) ... 时间戳和水印在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。
我们可以定义好Timestamp Extractors / Watermark Emitters,通过以下方式将其传递给您的消费者:
java
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> stream = env .addSource(myConsumer) .print();