[源码分析] 从源码入手看 Flink Watermark 之传播过程 (14)

回到实例代码,filter,map是在StreamTask中执行,可以看看StreamTask等具体定义。

@Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { private final StreamTaskActionExecutor actionExecutor; /** * The input processor. Initialized in {@link #init()} method. */ @Nullable protected StreamInputProcessor inputProcessor; // 这个是处理关键。 /** the head operator that consumes the input streams of this task. */ protected OP headOperator; /** The chain of operators executed by this task. */ protected OperatorChain<OUT, OP> operatorChain; /** The configuration of this streaming task. */ protected final StreamConfig configuration; /** Our state backend. We use this to create checkpoint streams and a keyed state backend. */ protected StateBackend stateBackend; /** The external storage where checkpoint data is persisted. */ private CheckpointStorageWorkerView checkpointStorage; /** * The internal {@link TimerService} used to define the current * processing time (default = {@code System.currentTimeMillis()}) and * register timers for tasks to be executed in the future. */ protected TimerService timerService; private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler; /** The map of user-defined accumulators of this task. */ private final Map<String, Accumulator<?, ?>> accumulatorMap; /** The currently active background materialization threads. */ private final CloseableRegistry cancelables = new CloseableRegistry(); private final StreamTaskAsyncExceptionHandler asyncExceptionHandler; /** * Flag to mark the task "in operation", in which case check needs to be initialized to true, * so that early cancel() before invoke() behaves correctly. */ private volatile boolean isRunning; /** Flag to mark this task as canceled. */ private volatile boolean canceled; private boolean disposedOperators; /** Thread pool for async snapshot workers. */ private ExecutorService asyncOperationsThreadPool; private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter; protected final MailboxProcessor mailboxProcessor; private Long syncSavepointId = null; @Override public final void invoke() throws Exception { try { beforeInvoke(); // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work isRunning = true; runMailboxLoop(); //MailboxProcessor.runMailboxLoop会调用StreamTask.processInput // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } afterInvoke(); } finally { cleanUpInvoke(); } } protected void processInput(MailboxDefaultAction.Controller controller) throws Exception { InputStatus status = inputProcessor.processInput(); // 这里会具体从source读取数据。 if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) { return; } if (status == InputStatus.END_OF_INPUT) { controller.allActionsCompleted(); return; } //具体执行操作。 CompletableFuture<?> jointFuture = getInputOutputJointFuture(status); MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction(); jointFuture.thenRun(suspendedDefaultAction::resume); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html