7) 相当于Kafka 中"isolation.level", "read_committed" ,指定KafkaConsumer只应读取非事务性消息,或从其输入主题中提交事务性消息。流处理应用程序通常在多个读取处理写入阶段处理其数据,每个阶段使用前一阶段的输出作为其输入。通过指定read_committed模式,我们可以在所有阶段完成一次处理。针对"Exactly-once" 语义,支持Kafka 0.11版本。
.withReadCommitted()8) 设置Kafka是否自动提交属性"AUTO_COMMIT",默认为自动提交,使用Beam 的方法来设置。
set CommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize) .commitOffsetsInFinalize()9) 设置是否返回Kafka的其他数据,例如offset 信息和分区信息,不用可以去掉。
.withoutMetadata() // PCollection<KV<Long, String>>10) 设置只返回values值,不用返回key。例如 PCollection<String>,而不是PCollection<Long,String>。
.apply(Values.<String>create()) // PCollection<String>KafkaIO写操作
写操作跟读操作配置基本相似,我们看一下具体代码。
PCollection<KV<Long, String>> kvColl = ...; kvColl.apply(KafkaIO.<Long, String>write() .withBootstrapServers("broker_1:9092,broker_2:9092") .withTopic("results") .withKeySerializer(LongSerializer.class) .withValueSerializer(StringSerializer.class) // You can further customize KafkaProducer used to write the records by adding more // settings for ProducerConfig. e.g, to enable compression : .updateProducerProperties(ImmutableMap.of("compression.type", "gzip")) // You set publish timestamp for the Kafka records. .withInputTimestamp() // element timestamp is used while publishing to Kafka // or you can also set a custom timestamp with a function. .withPublishTimestampFunction((elem, elemTs) -> ...) // Optionally enable exactly-once sink (on supported runners). See JavaDoc for withEOS(). .withEOS(20, "eos-sink-group-id"); );下面这个是Kafka里面比较重要的一个属性设置,在Beam中是这样使用的,非常简单,但是要注意这个属性.withEOS 其实就是Kafka中"Exactly-once"。
.withEOS(20, "eos-sink-group-id");在写入Kafka时完全一次性地提供语义,这使得应用程序能够在Beam管道中的一次性语义之上提供端到端的一次性保证。它确保写入接收器的记录仅在Kafka上提交一次,即使在管道执行期间重试某些处理也是如此。重试通常在应用程序重新启动时发生(如在故障恢复中)或者在重新分配任务时(如在自动缩放事件中)。Flink runner通常为流水线的结果提供精确一次的语义,但不提供变换中用户代码的副作用。如果诸如Kafka接收器之类的转换写入外部系统,则这些写入可能会多次发生。
在此处启用EOS时,接收器转换将兼容的Beam Runners中的检查点语义与Kafka中的事务联系起来,以确保只写入一次记录。由于实现依赖于runners checkpoint语义,因此并非所有runners都兼容。Beam中FlinkRunner针对Kafka 0.11+版本才支持,然而Dataflow runner和Spark runner如果操作kafkaIO是完全支持的。
关于性能的注意事项
"Exactly-once" 在接收初始消息的时候,除了将原来的数据进行格式化转换外,还经历了2个序列化 - 反序列化循环。根据序列化的数量和成本,CPU可能会涨的很明显。通过写入二进制格式数据(即在写入Kafka接收器之前将数据序列化为二进制数据)可以降低CPU成本。
关于参数
numShards——设置接收器并行度。存储在Kafka上的状态元数据,使用sinkGroupId存储在许多虚拟分区中。一个好的经验法则是将其设置为Kafka主题中的分区数。