Flink1.9整合Kafka (2)

scala:

val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181") properties.setProperty("group.id", "test") stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) .print()

必须有的:

1.topic名称

2.用于反序列化Kafka数据的DeserializationSchema / KafkaDeserializationSchema

3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)

配置消费起始位置

java:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...); myConsumer.setStartFromEarliest(); // start from the earliest record possible myConsumer.setStartFromLatest(); // start from the latest record myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds) myConsumer.setStartFromGroupOffsets(); // the default behaviour //指定位置 //Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); //specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); //myConsumer.setStartFromSpecificOffsets(specificStartOffsets); DataStream<String> stream = env.addSource(myConsumer);

scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment() val myConsumer = new FlinkKafkaConsumer[String](...) myConsumer.setStartFromEarliest() // start from the earliest record possible myConsumer.setStartFromLatest() // start from the latest record myConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds) myConsumer.setStartFromGroupOffsets() // the default behaviour //指定位置 //val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]() //specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) //myConsumer.setStartFromSpecificOffsets(specificStartOffsets) val stream = env.addSource(myConsumer) 检查点

启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用Kafka的记录。

如果禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。

如果启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // checkpoint every 5000 msecs

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment() env.enableCheckpointing(5000) // checkpoint every 5000 msecs 分区发现

Flink Kafka Consumer支持发现动态创建的Kafka分区,并使用一次性保证消费它们。

还可以使用正则:

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>( java.util.regex.Pattern.compile("test-topic-[0-9]"), new SimpleStringSchema(), properties); DataStream<String> stream = env.addSource(myConsumer); ...

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment() val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val myConsumer = new FlinkKafkaConsumer08[String]( java.util.regex.Pattern.compile("test-topic-[0-9]"), new SimpleStringSchema, properties) val stream = env.addSource(myConsumer) ... 时间戳和水印

在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。

我们可以定义好Timestamp Extractors / Watermark Emitters,通过以下方式将其传递给您的消费者:

java

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> stream = env .addSource(myConsumer) .print();

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyfj.html