[源码分析] 从源码入手看 Flink Watermark 之传播过程 (9)

针对示例代码,"assignTimestampsAndWatermarks","Filter","Map"这几种,都被转换为 SingleOutputStreamOperator,继续由用户进行逻辑处理。SingleOutputStreamOperator这个类名字有点误导,实际上它是DataStream的子类。

@Public public class DataStream<T> { protected final StreamExecutionEnvironment environment; protected final Transformation<T> transformation; //assignTimestampsAndWatermarks这个操作实际上也生成了一个SingleOutputStreamOperator算子 public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); } //Map是一个OneInputStreamOperator算子。 public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) { return transform("Map", outputType, new StreamMap<>(clean(mapper))); } @PublicEvolving public <R> SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperatorFactory<T, R> operatorFactory) { return doTransform(operatorName, outTypeInfo, operatorFactory); } protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); // SingleOutputStreamOperator 实际上是 DataStream 的子类,名字里面有Operator容易误导大家。 @SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); //就是把Transformation加到运行环境上去。 getExecutionEnvironment().addOperator(resultTransform); return returnStream; } }

针对示例代码,绝大多数逻辑算子都转换为OneInputTransformation,每个Transformation里面间接记录了对应的物理Operator。注册到Env上。

// OneInputTransformation对应了单输入的算子 @Internal public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> { private final Transformation<IN> input; private final StreamOperatorFactory<OUT> operatorFactory; // 这里间接记录了本Transformation对应的物理Operator。比如StreamMap。 private KeySelector<IN, ?> stateKeySelector; private TypeInformation<?> stateKeyType; public OneInputTransformation( Transformation<IN> input, String name, OneInputStreamOperator<IN, OUT> operator, // 比如StreamMap TypeInformation<OUT> outputType, int parallelism) { this(input, name, SimpleOperatorFactory.of(operator), outputType, parallelism); } }

回到样例代码,DataStream.keyBy会返回一个KeyedStream。KeyedStream. timeWindow会返回一个WindowedStream。同时内部把各种 Transformation 注册到了 Env 中。

WindowedStream内部对应WindowedOperator。WindowedStream却不是Stream的子类! 而是把 KeyedStream 包含在内作为一个成员变量。

// 这个居然不是Stream的子类! 而是把 KeyedStream 包含在内作为一个成员变量。 @Public public class WindowedStream<T, K, W extends Window> { private final KeyedStream<T, K> input; // 这里包含了DataStream。 private final WindowAssigner<? super T, W> windowAssigner; private Trigger<? super T, ? super W> trigger; private Evictor<? super T, ? super W> evictor; private long allowedLateness = 0L; // reduce, fold等函数也是类似操作。 private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) { final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null); KeySelector<T, K> keySel = input.getKeySelector(); WindowOperator<K, T, Iterable<T>, R, W> operator; ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents", input.getType().createSerializer(getExecutionEnvironment().getConfig())); // 这里直接生成了 WindowOperator operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, function, trigger, allowedLateness, lateDataOutputTag); } return input.transform(opName, resultType, operator); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html