[源码分析] 从源码入手看 Flink Watermark 之传播过程 (3)

这种精度的watermark能够更好的描述系统内部状态。能够更简单的跟踪数据在系统各个buffer中的流转状态,有助于排查数据堵塞问题。

0x03. Flink 程序结构 & 核心概念 1. 程序结构

Flink程序像常规的程序一样对数据集合进行转换操作,每个程序由下面几部分组成:

获取一个执行环境

加载/创建初始化数据

指定对于数据的transformations操作

指定计算的输出结果(打印或者输出到文件)

触发程序执行

flink流式计算的核心概念,就是将数据从输入流一个个传递给Operator进行链式处理,最后交给输出流的过程。对数据的每一次处理在逻辑上成为一个operator,并且为了本地化处理的效率起见,operator之间也可以串成一个chain一起处理。

下面这张图表明了flink是如何看待用户的处理流程的:用户操作被抽象化为一系列operator。以source开始,以sink结尾,中间的operator做的操作叫做transform,并且可以把几个操作串在一起执行。

Source ---> Transformation ----> Transformation ----> Sink

以下是一个样例代码,后续的分析会基于此代码。

DataStream<String> text = env.socketTextStream(hostname, port); DataStream counts = text .filter(new FilterClass()) .map(new LineSplitter()) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator) .keyBy(0) .timeWindow(Time.seconds(10)) .sum(2) counts.print() System.out.println(env.getExecutionPlan()); 2. 核心类/接口

在用户设计程序时候,对应如下核心类/接口

DataStream:描述的是一个具有相同数据类型的数据流,底层是通过具体的Transformation来实现,其负责提供各种对流上的数据进行操作转换的API接口。

Transformation:描述了构建一个DataStream的操作,以及该操作的并行度、输出数据类型等信息,并有一个属性,用来持有StreamOperator的一个具体实例;

上述代码逻辑中,对数据流做了如下操作:filter, map, keyBy, assignTimestampsAndWatermarks, timeWindow, sum。每次转换都生成了一个新的DataStream。

比如实例代码中的timeWindow最后生成了windowedStream。windowedStream之上执行的apply方法会生成了WindowOperator,初始化时包含了trigger以及allowedLateness的值。然后经过transform转换,实际上是执行了DataStream中的transform方法,最后生成了SingleOutputStreamOperator。SingleOutputStreamOperator这个类名字有点误导,实际上它是DataStream的子类。

public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { KeySelector<T, K> keySel = input.getKeySelector(); //根据keyedStream获取key WindowOperator<K, T, Iterable<T>, R, W> operator; operator = new WindowOperator<>(windowAssigner, ... , new InternalIterableWindowFunction<>(function), trigger, allowedLateness, legacyWindowOpType); return input.transform(opName, resultType, operator);//根据operator name,窗口函数的类型,以及window operator,执行keyedStream.transaform操作 } 0x04. Flink 执行图模型

Flink 中的执行图可以分成四层:StreamGraph ---> JobGraph ---> ExecutionGraph -> 物理执行图

StreamGraph:是对用户逻辑的映射,代表程序的拓扑结构,是根据用户通过 Stream API 编写的代码生成的最初的图。

JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。

ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。

物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。

我们这里重点看StreamGraph,其相关重点数据结构是:

StreamNode 是用来描述 operator 的逻辑节点,并具有所有相关的属性,如并发度、入边和出边等。

StreamEdge 是用来描述两个 StreamNode(operator) 逻辑的链接边。

我们可以直接打印 Execution Plan

System.out.println(env.getExecutionPlan());

其内部调用 StreamExecutionEnvironment.getExecutionPlan 得到 StreamGraph。

public String getExecutionPlan() { return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON(); }

StreamGraph的转换流是:

* Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html