以上方法虽然冗长但流程还算清晰,其中的fire方法用于对窗口进行计算,它会调用内部窗口函数(即InternalWindowFunction,它包装了WindowFunction)的apply方法。
processElement处理element到达的逻辑,将当前的element的value加到对应的window中,触发onElement
public void processElement(StreamRecord<IN> element) throws Exception { Collection<W> elementWindows = windowAssigner.assignWindows( //通过WindowAssigner为element分配一系列windows element.getValue(), element.getTimestamp(), windowAssignerContext); final K key = (K) getStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow //....... } else { //如果是普通window for (W window: elementWindows) { // drop if the window is already late if (isLate(window)) { //late data的处理,默认是丢弃 continue; } AppendingState<IN, ACC> windowState = getPartitionedState( //从backend中取出该window的状态,就是buffer的element window, windowSerializer, windowStateDescriptor); windowState.add(element.getValue()); //把当前的element加入buffer state context.key = key; context.window = window; //context的设计相当tricky和晦涩 TriggerResult triggerResult = context.onElement(element); //触发onElment,得到triggerResult if (triggerResult.isFire()) { //对triggerResult做各种处理 ACC contents = windowState.get(); if (contents == null) { continue; } fire(window, contents); //如果fire,真正去计算窗口中的elements } if (triggerResult.isPurge()) { cleanup(window, windowState, null); //purge,即去cleanup elements } else { registerCleanupTimer(window); } } } }判断是否是late data的逻辑
protected boolean isLate(W window) { return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark)); }而isCleanupTime和cleanup这对方法主要涉及到窗口的清理。如果当前窗口是时间窗口,且窗口的时间到达了清理时间,则会进行清理窗口清理。那么清理时间如何判断呢?Flink是通过窗口的最大时间戳属性结合允许延迟的时间联合计算的
private long cleanupTime(W window) { //清理时间被预置为窗口的最大时间戳加上允许的延迟事件 long cleanupTime = window.maxTimestamp() + allowedLateness; //如果窗口为非时间窗口(其maxTimestamp属性值为Long.MAX_VALUE),则其加上允许延迟的时间, //会造成Long溢出,从而会变成负数,导致cleanupTime < window.maxTimestamp 条件成立, //则直接将清理时间设置为Long.MAX_VALUE return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } trigger这个是用来触发onProcessingTime,这个需要依赖系统时间的定时器来触发,逻辑和processWatermark基本等同,只是触发条件不一样
@Override public void trigger(long time) throws Exception { boolean fire; //Remove information about the triggering task processingTimeTimerFutures.remove(time); processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time)); do { Timer<K, W> timer = processingTimeTimersQueue.peek(); if (timer != null && timer.timestamp <= time) { fire = true; processingTimeTimers.remove(timer); processingTimeTimersQueue.remove(); context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); AppendingState<IN, ACC> windowState; MergingWindowSet<W> mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { // then the window is already purged and this is a cleanup // timer set due to allowed lateness that has nothing to clean, // so it is safe to just ignore continue; } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } ACC contents = windowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); if (triggerResult.isFire()) { fire(context.window, contents); } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { cleanup(context.window, windowState, mergingWindows); } } else { fire = false; } } while (fire); } 0x08 参考Flink - watermark
Stream指南七 理解事件时间与Watermarks
Flink运行时之流处理程序生成流图
Flink原理与实现:如何生成ExecutionGraph及物理执行图
Flink timer注册与watermark触发
Apache Flink源码解析 (四)Stream Operator
Flink流处理之窗口算子分析
Flink 原理与实现:如何生成 StreamGraph
Flink中Watermark定时生成源码分析
追源索骥:透过源码看懂Flink核心框架的执行流程
Flink运行时之流处理程序生成流图
Apache Flink 进阶(六):Flink 作业执行深度解析
调试Windows和事件时间
Flink最佳实践(二)Flink流式计算系统
Streaming System 第三章:Watermarks
Apache Flink源码解析之stream-source
Flink源码系列——Flink中一个简单的数据处理功能的实现过程
Flink中task之间的数据交换机制
Flink task之间的数据交换
[Flink架构(二)- Flink中的数据传输](https://www.cnblogs.com/zackstang/p/10949559.html)
Flink的数据抽象及数据交换过程
聊聊flink的Execution Plan Visualization
Flink 原理与实现:如何生成 StreamGraph
Flink源码系列——获取StreamGraph的过程
Flink源码系列——Flink中一个简单的数据处理功能的实现过程
Flink源码解读系列1——分析一个简单Flink程序的执行过程
Flink timer注册与watermark触发[转载自网易云音乐实时计算平台经典实践知乎专栏]
[Flink – process watermark](https://www.cnblogs.com/fxjwind/p/7657058.html)
Flink流计算编程--Flink中allowedLateness详细介绍及思考
「Spark-2.2.0」Structured Streaming - Watermarking操作详解
Flink window机制
Flink – window operator
flink的window计算、watermark、allowedLateness、trigger
Apache Flink源码解析 (四)Stream Operator
Flink - watermark生成
Flink入门教程--Task Lifecycle(任务的生命周期简介)
Flink 原理与实现:Operator Chain原理
Flink算子的生命周期
Flink原理(三)——Task(任务)、Operator Chain(算子链)和Slot(资源)
Flink – Stream Task执行过程
Flink Slot详解与Job Execution Graph优化