[源码分析] 从源码入手看 Flink Watermark 之传播过程 (18)

如果是水位线(事件时间场景),则方法processWatermark将会被调用,它将会处理水位线定时器队列中的定时器。如果时间戳满足条件,则利用触发器的onEventTime方法进行处理。processWatermark 用来处理上游发送过来的watermark,可以认为不做任何处理,下游的watermark只与其上游最近的生成方式相关。

WindowOperator内部有触发器上下文对象接口的实现——Context,它主要提供了三种类型的方法:

提供状态存储与访问;

定时器的注册与删除;

窗口触发器process系列方法的包装;

在注册定时器时,会新建定时器对象并将其加入到定时器队列中。等到时间相关的处理方法(processWatermark和trigger)被触发调用,则会从定时器队列中消费定时器对象并调用窗口触发器,然后根据触发结果来判断是否触动窗口的计算。

@Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> { protected final WindowAssigner<? super IN, W> windowAssigner; protected transient TimestampedCollector<OUT> timestampedCollector; protected transient Context triggerContext = new Context(null, null); //触发器上下文对象 protected transient WindowContext processContext; protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;

无论是windowOperator还是KeyedProcessOperator都持有InternalTimerService具体实现的对象,通过这个对象用户可以注册EventTime及ProcessTime的timer,当watermark 越过这些timer的时候,调用回调函数执行一定的操作。

window operator通过WindowAssigner和Trigger来实现它的逻辑。当一个element到达时,通过KeySelector先assign一个key,并且通过WindowAssigner assign若干个windows(指定element分配到哪个window去),这样这个element会被放入若干个pane。一个pane会存放所有相同key和相同window的elements。

比如 SlidingEventTimeWindows 的实现。

* public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> Collection<TimeWindow> assignWindows(Object element, long timestamp, ...) { List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { //可以看到这里会assign多个TimeWindow,因为是slide windows.add(new TimeWindow(start, start + size)); } return windows; }

再比如 TumblingProcessingTimeWindows

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { Collection<TimeWindow> assignWindows(Object element, long timestamp, ...) { final long now = context.getCurrentProcessingTime(); long start = now - (now % size); //很简单,分配一个TimeWindow return Collections.singletonList(new TimeWindow(start, start + size)); } processWatermark

首先看看处理Watermark

public void processWatermark(Watermark mark) throws Exception { //定义一个标识,表示是否仍有定时器满足触发条件 boolean fire; do { //从水位线定时器队列中查找队首的一个定时器,注意此处并不是出队(注意跟remove方法的区别) Timer<k, w=""> timer = watermarkTimersQueue.peek(); //如果定时器存在,且其时间戳戳不大于水位线的时间戳 //(注意理解条件是:不大于,水位线用于表示小于该时间戳的元素都已到达,所以所有不大于水位线的触发时间戳都该被触发) if (timer != null && timer.timestamp <= mark.getTimestamp()) { //置标识为真,表示找到满足触发条件的定时器 fire = true; //将该元素从队首出队 watermarkTimers.remove(timer); watermarkTimersQueue.remove(); //构建新的上下文 context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); //窗口所使用的状态存储类型为可追加的状态存储 AppendingState<in, acc=""> windowState; MergingWindowSet<w> mergingWindows = null; //如果分配器是合并分配器(比如会话窗口) if (windowAssigner instanceof MergingWindowAssigner) { //获得合并窗口帮助类MergingWindowSet的实例 mergingWindows = getMergingWindowSet(); //获得当前窗口对应的状态窗口(状态窗口对应着状态后端存储的命名空间) W stateWindow = mergingWindows.getStateWindow(context.window); //如果没有对应的状态窗口,则跳过本次循环 if (stateWindow == null) { continue; } //获得当前窗口对应的状态表示 windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { //如果不是合并分配器,则直接获取窗口对应的状态表示 windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } //从窗口状态表示中获得窗口中所有的元素 ACC contents = windowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } //通过上下文对象调用窗口触发器的事件时间处理方法并获得触发结果对象 TriggerResult triggerResult = context.onEventTime(timer.timestamp); //如果触发的结果是FIRE(触动窗口计算),则调用fire方法进行窗口计算 if (triggerResult.isFire()) { fire(context.window, contents); } //而如果触动的结果是清理窗口,或者事件时间等于窗口的清理时间(通常为窗口的maxTimestamp属性) if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { //清理窗口及元素 cleanup(context.window, windowState, mergingWindows); } } else { //队列中没有符合条件的定时器,置标识为否,终止循环 fire = false; } } while (fire); //向下游发射水位线,把waterMark传递下去 output.emitWatermark(mark); //更新currentWaterMark, 将当前算子的水位线属性用新水位线的时间戳覆盖 this.currentWatermark = mark.getTimestamp(); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html