sinkGroupId——用于在Kafka上将少量状态存储为元数据的组ID。它类似于与KafkaConsumer一起使用的使用groupID。每个作业都应使用唯一的groupID,以便重新启动/更新作业保留状态以确保一次性语义。状态是通过Kafka上的接收器事务原子提交的。有关更多信息,请参阅KafkaProducer.sendOffsetsToTransaction(Map,String)。接收器在初始化期间执行多个健全性检查以捕获常见错误,以便它不会最终使用似乎不是由同一作业写入的状态。
五.Apache Beam Flink源码剖析 Apache Beam FlinkRunner对 Flink支持依赖情况Flink 是一个流和批处理的统一的计算框架,Apache Beam 跟Flink API做了无缝集成。在Apache Beam中对Flink 的操作主要是 FlinkRunner.java,Apache Beam支持不同版本的flink 客户端。我根据不同版本列了一个Flink 对应客户端支持表如下:
图5-1 FlinkRunner与Flink依赖关系表
从图5-1中可以看出,Apache Beam 对Flink 的API支持的更新速度非常快,从源码可以看到2.0.0版本之前的FlinkRunner是非常low的,并且直接拿Flink的实例做为Beam的实例,封装的效果也比较差。但是从2.0.0 版本之后 ,Beam就像打了鸡血一样API更新速度特别快,抛弃了以前的冗余,更好地跟Flink集成,让人眼前一亮。
Apache Beam Flink 源码解析因为Beam在运行的时候都是显式指定Runner,在FlinkRunner源码中只是成了简单的统一入口,代码非常简单,但是这个入口中有一个比较关键的接口类FlinkPipelineOptions。
请看代码:
/** Provided options. */ private final FlinkPipelineOptions options;通过这个类我们看一下Apache Beam到底封装了哪些Flink方法。
首先FlinkPipelineOptions是一个接口类,但是它继承了 PipelineOptions、ApplicationNameOptions、StreamingOptions 三个接口类,第一个PipelineOptions大家应该很熟悉了,用于基本管道创建;第二个ApplicationNameOptions 用于设置应用程序名字;第三个用于判断是流式数据还是批数据。源代码如下:
public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { //.... }1) 设置 Flink Master 方法 ,这个方法用于设置Flink 集群地址的Master地址。可以填写IP和端口,或者是hostname 和端口,默认local 。当然测试也可以是单机的,在Flink 1.4 利用 start-local.sh 启动,而到了1.5以上就去掉了这个脚本,本地直接换成了 start-cluster.sh。大家测试的时候需要注意一下。
/** * The url of the Flink JobManager on which to execute pipelines. This can either be the the * address of a cluster JobManager, in the form "host:port" or one of the special Strings * "[collection]" will execute the pipeline on Java Collections while "[auto]" will let the system */ @Description( "Address of the Flink Master where the Pipeline should be executed. Can"+ "[collection] or [auto].") void setFlinkMaster(String value);2) 设置 Flink 的并行数,属于Flink 高级API里面的属性。设置合适的parallelism能提高运算效率,太多了和太少了都不行。设置parallelism有多种方式,优先级为api>env>p>file。
@Description("The degree of parallelism to be used when distributing operations onto workers.") @Default.InstanceFactory(DefaultParallelismFactory.class) Integer getParallelism(); void setParallelism(Integer value);3) 设置连续检查点之间的间隔时间(即当前的快照)用于容错的管道状态。
@Description("The interval between consecutive checkpoints (i.e. snapshots of the current" @Default.Long(-1L) Long getCheckpointingInterval(); void setCheckpointingInterval(Long interval)4) 定义一致性保证的检查点模式,默认为"AT_LEAST_ONCE",在Beam的源码中定义了一个枚举类CheckpointingMode,除了默认的"AT_LEAST_ONCE",还有"EXACTLY_ONCE"。
"AT_LEAST_ONCE":这个模式意思是系统将以一种更简单地方式来对operator和udf的状态进行快照:在失败后进行恢复时,在operator的状态中,一些记录可能会被重放多次。