[源码分析] 从源码入手看 Flink Watermark 之传播过程 (4)

下面是我把 示例代码 打印StreamGraph结果整理出来一个静态架构。可以看出代码中的转换被翻译成了如下执行Unit(在下面图中,其执行序列是由上而下)。

* +-----> Data Source(ID = 1) [ Source Socket Stream ] * | // env.socketTextStream(hostname, port) 方法中生成了一个 Data Source * | * +-----> Operator(ID = 2) [ Filter ] * | * | * +-----> Operator(ID = 3) [ Map ] * | * | * +-----> Operator(ID = 4) [ Timestamps/Watermarks ] * | * | * +-----> Operator(ID = 6) [ Window(SumAggregator) ] * | // 多个Operator被构建成 Operator Chain * | * | * +-----> Data Sink(ID = 7) [ Sink : Print to Std. Out ] * // counts.print() 是在数据流最后添加了个 Data Sink,用于承接统计结果

示例代码中,Flink生成StreamGraph的大致处理流程是:

首先处理的Source,生成了Source的StreamNode。

处理Filter,生成了Filter的StreamNode,并生成StreamEdge连接上游Source和Filter。

处理Map,生成了Map的StreamNode,并生成StreamEdge连接上游Filter和Map。

处理assignTimestampsAndWatermarks,生成了Timestamps/Watermarks的StreamNode,并生成StreamEdge连接上游Map和Timestamps/Watermarks。

处理keyBy/timeWindow/sum,生成了Window的StreamNode 以及 Operator Chain,并生成StreamEdge连接上游Timestamps/Watermarks和Window。

最后处理Sink,创建Sink的StreamNode,并生成StreamEdge与上游Window相连。

0x05. 执行模块生命周期

这里主要核心类是:

Function:用户通过继承该接口的不同子类来实现用户自己的数据处理逻辑。如子类SocketTextStreamFunction实现从指定hostname和port来接收数据,并转发字符串的逻辑;

Task: 是Flink中执行的基本单位,代表一个 TaskManager 中所起的并行子任务,执行封装的 flink 算子并运行,提供以下服务:消费输入data、生产 IntermediateResultPartition [ flink关于中间结果的抽象 ]、与 JobManager 交互。

StreamTask : 是本地执行的基本单位,由TaskManagers部署执行。包含了多个StreamOperator,封装了算子的处理逻辑。

StreamOperator:DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。

StreamSource 是StreamOperator接口的一个具体实现类,其构造函数入参就是SourceFunction的子类,这里就是SocketTextStreamFunction的实例。

Task 是直接受 TaskManager 管理和调度的,而 Task 又会调用 StreamTask(主要是其各种子类),StreamTask 中封装了算子(StreamOperator)的处理逻辑。StreamSource是用来开启整个流的算子。我们接下来就说说动态逻辑。

我们的示例代码中,所有程序逻辑都是运行在StreamTask(主要是其各种子类)中,filter/map对应了StreamOperator;assignTimestampsAndWatermarks用来生成Watermarks,传递给下游的.keyBy.timeWindow(WindowOperator)。而keyBy/timeWindow/sum又被构建成OperatorChain。所以我们下面就逐一讲解这些概念。

1. Task

Task,它是在线程中执行的Runable对象,每个Task都是由一组Operators Chaining在一起的工作集合,Flink Job的执行过程可看作一张DAG图,Task是DAG图上的顶点(Vertex),顶点之间通过数据传递方式相互链接构成整个Job的Execution Graph。

Task 是直接受 TaskManager 管理和调度的,Flink最后通过RPC方法提交task,实际会调用到TaskExecutor.submitTask方法中。这个方法会创建真正的Task,然后调用task.startTaskThread();开始task的执行。而startTaskThread方法,则会执行executingThread.start,从而调用Task.run方法。
它的最核心的代码如下:

* public class Task implements Runnable... * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and runs it * * -- doRun() * | * +----> 从 NetworkEnvironment 中申请 BufferPool * | 包括 InputGate 的接收 pool 以及 task 的每个 ResultPartition 的输出 pool * +----> invokable = loadAndInstantiateInvokable(userCodeClassLoader, * | nameOfInvokableClass) 通过反射创建 * | load and instantiate the task's invokable code * | invokable即为operator对象实例,例如OneInputStreamTask,SourceStreamTask等 * | OneInputStreamTask继承了StreamTask,这里实际调用的invoke()方法是StreamTask里的 * +----> invokable.invoke() * | run the invokable, * | * | * OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html