Apache Beam实战指南(6)

sinkGroupId——用于在Kafka上将少量状态存储为元数据的组ID。它类似于与KafkaConsumer一起使用的使用groupID。每个作业都应使用唯一的groupID,以便重新启动/更新作业保留状态以确保一次性语义。状态是通过Kafka上的接收器事务原子提交的。有关更多信息,请参阅KafkaProducer.sendOffsetsToTransaction(Map,String)。接收器在初始化期间执行多个健全性检查以捕获常见错误,以便它不会最终使用似乎不是由同一作业写入的状态。

五.Apache Beam Flink源码剖析 Apache Beam FlinkRunner对 Flink支持依赖情况

Flink 是一个流和批处理的统一的计算框架,Apache Beam 跟Flink API做了无缝集成。在Apache Beam中对Flink 的操作主要是 FlinkRunner.java,Apache Beam支持不同版本的flink 客户端。我根据不同版本列了一个Flink 对应客户端支持表如下:

Apache Beam实战指南 | 玩转KafkaIO与Flink

图5-1 FlinkRunner与Flink依赖关系表

从图5-1中可以看出,Apache Beam 对Flink 的API支持的更新速度非常快,从源码可以看到2.0.0版本之前的FlinkRunner是非常low的,并且直接拿Flink的实例做为Beam的实例,封装的效果也比较差。但是从2.0.0 版本之后 ,Beam就像打了鸡血一样API更新速度特别快,抛弃了以前的冗余,更好地跟Flink集成,让人眼前一亮。

Apache Beam Flink 源码解析

因为Beam在运行的时候都是显式指定Runner,在FlinkRunner源码中只是成了简单的统一入口,代码非常简单,但是这个入口中有一个比较关键的接口类FlinkPipelineOptions。

请看代码:

/** Provided options. */ private final FlinkPipelineOptions options;

通过这个类我们看一下Apache Beam到底封装了哪些Flink方法。

首先FlinkPipelineOptions是一个接口类,但是它继承了 PipelineOptions、ApplicationNameOptions、StreamingOptions 三个接口类,第一个PipelineOptions大家应该很熟悉了,用于基本管道创建;第二个ApplicationNameOptions 用于设置应用程序名字;第三个用于判断是流式数据还是批数据。源代码如下:

public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { //.... }

1) 设置 Flink Master 方法 ,这个方法用于设置Flink 集群地址的Master地址。可以填写IP和端口,或者是hostname 和端口,默认local 。当然测试也可以是单机的,在Flink 1.4 利用 start-local.sh 启动,而到了1.5以上就去掉了这个脚本,本地直接换成了 start-cluster.sh。大家测试的时候需要注意一下。

/** * The url of the Flink JobManager on which to execute pipelines. This can either be the the * address of a cluster JobManager, in the form "host:port" or one of the special Strings * "[collection]" will execute the pipeline on Java Collections while "[auto]" will let the system */ @Description( "Address of the Flink Master where the Pipeline should be executed. Can"+ "[collection] or [auto].") void setFlinkMaster(String value);

2) 设置 Flink 的并行数,属于Flink 高级API里面的属性。设置合适的parallelism能提高运算效率,太多了和太少了都不行。设置parallelism有多种方式,优先级为api>env>p>file。

@Description("The degree of parallelism to be used when distributing operations onto workers.") @Default.InstanceFactory(DefaultParallelismFactory.class) Integer getParallelism(); void setParallelism(Integer value);

3) 设置连续检查点之间的间隔时间(即当前的快照)用于容错的管道状态。

@Description("The interval between consecutive checkpoints (i.e. snapshots of the current" @Default.Long(-1L) Long getCheckpointingInterval(); void setCheckpointingInterval(Long interval)

4) 定义一致性保证的检查点模式,默认为"AT_LEAST_ONCE",在Beam的源码中定义了一个枚举类CheckpointingMode,除了默认的"AT_LEAST_ONCE",还有"EXACTLY_ONCE"。

"AT_LEAST_ONCE":这个模式意思是系统将以一种更简单地方式来对operator和udf的状态进行快照:在失败后进行恢复时,在operator的状态中,一些记录可能会被重放多次。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/b0750d480662aeccb8a7dfff6579955c.html