这个nameOfInvokableClass是哪里生成的呢?其实早在生成StreamGraph的时候,这就已经确定了,见StreamGraph.addOperator方法
if (operatorObject instanceof StoppableStreamSource) { addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName); } else if (operatorObject instanceof StreamSource) { addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName); } else { addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName); }这里的OneInputStreamTask.class即为生成的StreamNode的vertexClass。这个值会一直传递
StreamGraph --> JobVertex.invokableClass --> ExecutionJobVertex.TaskInformation.invokableClassName --> Task 2. StreamTask是本地执行的基本单位,由TaskManagers部署执行,Task会调用 StreamTask。StreamTask包含了headOperator 和 operatorChain,封装了算子的处理逻辑。可以理解为,StreamTask是执行流程框架,OperatorChain(StreamOperator)是负责具体算子逻辑,嵌入到StreamTask的执行流程框架中。
直接从StreamTask的注释中,能看到StreamTask的生命周期。
其中,每个operator的open()方法都被StreamTask的openAllOperators()方法调用。该方法(指openAllOperators)执行所有的operational的初始化,例如使用定时器服务注册定时器。单个task可能正在执行多个operator,消耗其前驱的输出,在这种情况下,该open()方法在最后一个operator中调用,即这个operator的输出也是task本身的输出。这样做使得当第一个operator开始处理任务的输入时,它的所有下游operator都准备好接收其输出。
OperatorChain是在StreamTask的invoke方法中被创建的,在执行的时候,如果一个operator无法被chain起来,那它就只有headOperator,chain里就没有其他operator了。
注意: task中的连续operator是从最后到第一个依次open。
以OneInputStreamTask为例,Task的核心执行代码即为OneInputStreamTask.invoke方法,它会调用StreamTask.invoke方法。
* The life cycle of the task(StreamTask) is set up as follows: * {@code * -- setInitialState -> provides state of all operators in the chain * | * +----> 重新初始化task的state,并且在如下两种情况下尤为重要: * | 1. 当任务从故障中恢复并从最后一个成功的checkpoint点重新启动时 * | 2. 从一个保存点恢复时。 * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() //创建 operatorChain 并设置为 headOperator 的 Output * --------> openAllOperators() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() //执行 operatorChain 中所有 operator 的 open 方法 * +----> run() //runMailboxLoop()方法将一直运行,直到没有更多的输入数据 * --------> mailboxProcessor.runMailboxLoop(); * --------> StreamTask.processInput() * --------> StreamTask.inputProcessor.processInput() * --------> 间接调用 operator的processElement()和processWatermark()方法 * +----> close-operators() //执行 operatorChain 中所有 operator 的 close 方法 * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup() * } 3. OneInputStreamTaskOneInputStreamTask是 StreamTask 的实现类之一,具有代表性。我们示例代码中基本都是由OneInputStreamTask来做具体执行。
看看OneInputStreamTask 是如何生成的?
* 生成StreamNode时候 * * -- StreamGraph.addOperator() * | * +----> addNode(... OneInputStreamTask.class, operatorObject, operatorName); * | 将 OneInputStreamTask 等 StreamTask 设置到 StreamNode 的节点属性中 * * * 在 JobVertex 的节点构造时也会做一次初始化 * | * +----> jobVertex.setInvokableClass(streamNode.getJobVertexClass());后续在 TaskDeploymentDescriptor 实例化的时候会获取 jobVertex 中的属性。
再看看OneInputStreamTask 的 init() 和run() 分别都做了什么
* OneInputStreamTask * class OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>> * {@code * -- init方法 * | * +----> 获取算子对应的输入序列化器 TypeSerializer * +----> CheckpointedInputGate inputGate = createCheckpointedInputGate(); * 获取输入数据 InputGate[],InputGate 是 flink 网络传输的核心抽象之一 * 其在内部封装了消息的接收和内存的管理,从 InputGate 可以拿到上游传送过来的数据 * +----> inputProcessor = new StreamOneInputProcessor<>(input,output,operatorChain) * | 1. StreamInputProcessor,是 StreamTask 内部用来处理 Record 的组件, * | 里面封装了外部 IO 逻辑【内存不够时将 buffer 吐到磁盘上】以及 时间对齐逻辑【Watermark】 * | 2. output 是 StreamTaskNetworkOutput, input是StreamTaskNetworkInput * | 这样就把input, output 他俩聚合进StreamOneInputProcessor * +----> headOperator.getMetricGroup().gauge * +----> getEnvironment().getMetricGroup().gauge * 设置一些 metrics 及 累加器 * * * -- run方法(就是基类StreamTask.run) * +----> StreamTask.runMailboxLoop * | 从 StreamTask.runMailboxLoop 开始,下面是一层层的调用关系 * -----> StreamTask.processInput() * -----> StreamTask.inputProcessor.processInput() * -----> StreamOneInputProcessor.processInput * -----> input.emitNext(output) * -----> StreamTaskNetworkInput.emitNext() * | while(true) {从输入source读取一个record, output是 StreamTaskNetworkOutput} * -----> StreamTaskNetworkInput.processElement() //具体处理record * | 根据StreamElement的不同类型做不同处理 * | if (recordOrMark.isRecord()) output.emitRecord() * ------------> StreamTaskNetworkOutput.emitRecord() * ----------------> operator.processElement(record) * | if (recordOrMark.isWatermark()) statusWatermarkValve.inputWatermark() * | if (recordOrMark.isLatencyMarker()) output.emitLatencyMarker() * | if (recordOrMark.isStreamStatus()) statusWatermarkValve.inputStreamStatus() 4. OperatorChainflink 中的一个 operator 代表一个最顶级的 api 接口,拿 streaming 来说就是,在 DataStream 上做诸如 map/reduce/keyBy 等操作均会生成一个算子。