OneInputStreamOperator与TwoInputStreamOperator接口。这两个接口非常类似,本质上就是处理流上存在的三种元素StreamRecord,Watermark和LatencyMarker。一个用作单流输入,一个用作双流输入。
6. StreamSourceStreamSource是用来开启整个流的算子(继承AbstractUdfStreamOperator)。StreamSource因为没有输入,所以没有实现InputStreamOperator的接口。比较特殊的是ChainingStrategy初始化为HEAD。
在StreamSource这个类中,在运行时由SourceStreamTask调用SourceFunction的run方法来启动source。
* class StreamSource<OUT, SRC extends SourceFunction<OUT>> * extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> * * * -- run() * | * +----> latencyEmitter = new LatencyMarksEmitter * | 用来产生延迟监控的LatencyMarker * +----> this.ctx = StreamSourceContexts.getSourceContext * | 据时间模式(EventTime/IngestionTime/ProcessingTime)生成相应SourceConext * | 包含了产生element关联的timestamp的方法和生成watermark的方法 * +----> userFunction.run(ctx); * | 调用SourceFunction的run方法来启动source,进行数据的转发 * public { //读到数据后,把数据交给collect方法,collect方法负责把数据交到合适的位置(如发布为br变量,或者交给下个operator,或者通过网络发出去) private transient SourceFunction.SourceContext<OUT> ctx; private transient volatile boolean canceledOrStopped = false; private transient volatile boolean hasSentMaxWatermark = false; public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector, final OperatorChain<?, ?> operatorChain) throws Exception { userFunction.run(ctx); } } 7. StreamMapStreamFilter,StreamMap与StreamFlatMap算子在实现的processElement分别调用传入的FilterFunction,MapFunction, FlatMapFunction的udf将element传到下游。这里用StreamMap举例:
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { public StreamMap(MapFunction<IN, OUT> mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } } 8. WindowOperatorFlink通过水位线分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事件流中注入水位线。
我们示例代码中,timeWindow()最终对应了WindowStream,窗口算子WindowOperator是窗口机制的底层实现。assignTimestampsAndWatermarks 则对应了TimestampsAndPeriodicWatermarksOperator算子,它把产生的Watermark传递给了WindowOperator。
元素在streaming dataflow引擎中流动到WindowOperator时,会被分为两拨,分别是普通事件和水位线。
如果是普通的事件,则会调用processElement方法进行处理,在processElement方法中,首先会利用窗口分配器为当前接收到的元素分配窗口,接着会调用触发器的onElement方法进行逐元素触发。对于时间相关的触发器,通常会注册事件时间或者处理时间定时器,这些定时器会被存储在WindowOperator的处理时间定时器队列和水位线定时器队列中,如果触发的结果是FIRE,则对窗口进行计算。
如果是水位线(事件时间场景),则方法processWatermark将会被调用,它将会处理水位线定时器队列中的定时器。如果时间戳满足条件,则利用触发器的onEventTime方法进行处理。
而对于处理时间的场景,WindowOperator将自身实现为一个基于处理时间的触发器,以触发trigger方法来消费处理时间定时器队列中的定时器满足条件则会调用窗口触发器的onProcessingTime,根据触发结果判断是否对窗口进行计算。
* public class WindowOperator<K, IN, ACC, OUT, W extends Window> * extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> * implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> * * -- processElement() * | * +----> windowAssigner.assignWindows * | //通过WindowAssigner为element分配一系列windows * +----> windowState.add(element.getValue()) * | //把当前的element加入buffer state * +----> TriggerResult triggerResult = triggerContext.onElement(element) * | //触发onElment,得到triggerResult * +----> Trigger.OnMergeContext.onElement() * +----> trigger.onElement(element.getValue(), element.getTimestamp(), window,...) * +----> EventTimeTriggers.onElement() * | //如果当前window.maxTimestamp已经小于CurrentWatermark,直接触发 * | //否则将window.maxTimestamp注册到TimeService中,等待触发 * +----> contents = windowState.get(); emitWindowContents(actualWindow, contents) * | //对triggerResult做各种处理,如果fire,真正去计算窗口中的elements * -- processWatermark() * -----> 最终进入基类AbstractStreamOperator.processWatermark * -----> AbstractStreamOperator.processWatermark(watermark) * -----> timeServiceManager.advanceWatermark(mark); 第一步处理watermark * -----> output.emitWatermark(mark) 第二步将watermark发送到下游 * -----> InternalTimeServiceManager.advanceWatermark 0x06. 处理 Watermark 的简要流程