默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。
简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。
5. 处理消息过程windowoperator接到消息以后,首先存到state,存放的格式为k,v,key的格式是key + window,value是key和window对应的数据。
注册一个timer,timer的数据结构为 [key,window,window边界 - 1],将timer放到集合中去。
当windowoperator收到watermark以后,取出集合中小于watermark的timer,触发其window。触发的过程中将state里面对应key及window的数据取出来,这里要经过序列化的过程,发送给windowfunction计算。
数据发送给windowfunction,实现windowfunction的window数据计算逻辑。
比如某窗口有三个数据:[key A, window A, 0], [key A, window A, 4999], [key A, window A, 5000]
对于固定窗口,当第一个watermark (Watermark 5000)到达时候,[key A, window A, 0], [key A, window A, 4999] 会被计算,当第二个watermark (Watermark 9999)到达时候,[key A, window A, 5000]会被计算。
6. 累加(再次)计算watermark是全局性的参数,用于管理消息的乱序,watermark超过window的endtime之后,就会触发窗口计算。一般情况下,触发窗口计算之后,窗口就销毁掉了,后面再来的数据也不会再计算。
因为加入了allowedLateness,所以计算会和之前不同了。window这个allowedLateness属性,默认为0,如果allowedLateness > 0,那么在某一个特定watermark到来之前,这个触发过计算的窗口还会继续保留,这个保留主要是窗口里的消息。
这个特定的watermark是什么呢? watermark-allowedLateness>=窗口endtime。这个特定watermark来了之后,窗口就要消失了,后面再来属于这个窗口的消息,就丢掉了。在 "watermark(=窗口endtime)" ~ “watermark(=endtime+allowedLateness)" 这段时间之间,对应窗口可能会多次计算。那么要window的endtime+allowedLateness <= watermark的时候,window才会被清掉。
比如window的endtime是5000,allowedLateness=0,那么如果watermark 5000到来之后,这个window就应该被清除。但是如果allowedLateness = 1000,则需要等water 6000(endtime + allowedLateness)到来之后,这个window才会被清掉。
Flink的allowedLateness可用于TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,这可能使得窗口再次被触发,相当于对前一次窗口的窗口的修正(累加计算或者累加撤回计算);
注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + allowedLateness的时间,窗口的数据及元数据信息才会被删除。再次计算就是DataFlow模型中的Accumulating的情况。
同时,对于sessionWindow的情况,当late element在allowedLateness范围之内到达时,可能会引起窗口的merge,这样之前窗口的数据会在新窗口中累加计算,这就是DataFlow模型中的AccumulatingAndRetracting的情况。
7. Watermark传播生产任务的pipeline中通常有多个stage,在源头产生的watermark会在pipeline的多个stage间传递。了解watermark如何在一个pipeline的多个stage间进行传递,可以更好的了解watermark对整个pipeline的影响,以及对pipeline结果延时的影响。我们在pipeline的各stage的边界上对watermark做如下定义:
输入watermark(An input watermark):捕捉上游各阶段数据处理进度。对源头算子,input watermark是个特殊的function,对进入的数据产生watermark。对非源头算子,input watermark是上游stage中,所有shard/partition/instance产生的最小的watermark
输出watermark(An output watermark):捕捉本stage的数据进度,实质上指本stage中,所有input watermark的最小值,和本stage中所有非late event的数据的event time。比如,该stage中,被缓存起来等待做聚合的数据等。
每个stage内的操作并不是线性递增的。概念上,每个stage的操作都可以被分为几个组件(components),每个组件都会影响pipeline的输出watermark。每个组件的特性与具体的实现方式和包含的算子相关。理论上,这类算子会缓存数据,直到触发某个计算。比如缓存一部分数据并将其存入状态(state)中,直到触发聚合计算,并将计算结果写入下游stage。
watermark可以是以下项的最小值:
每个source的watermark(Per-source watermark) - 每个发送数据的stage.
每个外部数据源的watermark(Per-external input watermark) - pipeline之外的数据源
每个状态组件的watermark(Per-state component watermark) - 每种需要写入的state类型
每个输出buffer的watermark(Per-output buffer watermark) - 每个接收stage