在生成了程序逻辑之后,Env里面就有了 一系列 transformation(每个transformation里面记录了自己对应的物理 operator,比如StreamMap,WindowOperator),这个是后面生成计算图的基础。
当调用env.execute时,通过StreamGraphGenerator.generate遍历其中的transformation集合构造出StreamGraph。
2. 生成计算图我们这里重点介绍StreamGraph以及如何生成,JobGraph,ExecutionGraph只是简介。
StreamGraph代表程序的拓扑结构,是从用户代码直接生成的图。StreamOperator是具体的物理算子。
一个很重要的点是,把 SourceStreamTask / OneInputStreamTask 添加到StreamNode上,作为 jobVertexClass,这个是真实计算的部分。
StreamOperator是一个接口。StreamOperator 是 数据流操作符的基础接口,该接口的具体实现子类中,会有保存用户自定义数据处理逻辑的函数的属性,负责对userFunction的调用,以及调用时传入所需参数,比如在StreamSource这个类中,在调用SourceFunction的run方法时,会构建一个SourceContext的具体实例,作为入参,用于run方法中,进行数据的转发;
StreamOperator PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable { } AbstractStreamOperatorAbstractStreamOperator抽象类实现了StreamOperator。在AbstractStreamOperator中有一些重要的成员变量,总体来说可以分为几类,一类是运行时相关的,一类是状态相关的,一类是配置相关的,一类是时间相关的,还有一类是监控相关的。
@PublicEvolving public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable { protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD; private transient StreamTask<?, ?> container; protected transient StreamConfig config; protected transient Output<StreamRecord<OUT>> output; private transient StreamingRuntimeContext runtimeContext; public void processWatermark(Watermark mark) throws Exception { if (timeServiceManager != null) { timeServiceManager.advanceWatermark(mark); //第一步处理watermark } output.emitWatermark(mark);//第二步,将watermark发送到下游 } } AbstractUdfStreamOperatorAbstractUdfStreamOperator抽象类继承了AbstractStreamOperator,对其部分方法做了增强,多了一个成员变量UserFunction。提供了一些通用功能,比如把context赋给算子,保存快照等等。此外还实现了OutputTypeConfigurable接口的setOutputType方法对输出数据的类型做了设置。
@PublicEvolving public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { protected final F userFunction;/** The user function. */ } KeyedProcessOperator & WindowOperator。KeyedStream,WindowedStream分别对应KeyedProcessOperator,WindowOperator。
@Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> { protected final WindowAssigner<? super IN, W> windowAssigner; private final KeySelector<IN, K> keySelector; private final Trigger<? super IN, ? super W> trigger; private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor; protected final TypeSerializer<K> keySerializer; protected final TypeSerializer<W> windowSerializer; } @Internal public class KeyedProcessOperator<K, IN, OUT> extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> { private transient TimestampedCollector<OUT> collector; private transient ContextImpl context; private transient OnTimerContextImpl onTimerContext; @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); TimerService timerService = new SimpleTimerService(internalTimerService); context = new ContextImpl(userFunction, timerService); onTimerContext = new OnTimerContextImpl(userFunction, timerService); } @Override public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception { collector.setAbsoluteTimestamp(timer.getTimestamp()); invokeUserFunction(TimeDomain.EVENT_TIME, timer); } @Override public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception { collector.eraseTimestamp(); invokeUserFunction(TimeDomain.PROCESSING_TIME, timer); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); context.element = element; userFunction.processElement(element.getValue(), context, collector); context.element = null; } private void invokeUserFunction( TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer) throws Exception { onTimerContext.timeDomain = timeDomain; onTimerContext.timer = timer; userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); onTimerContext.timeDomain = null; onTimerContext.timer = null; } } OneInputStreamOperator & TwoInputStreamOperator