彻底搞清Flink中的Window (3)

每一条记录来了以后会根据时间属性值采用不同的window assinger 方法分配给一个或者多个窗口,分为滚动窗口(Tumbling windows)和滑动窗口(Sliding windows)。

EventTime 数据本身携带的时间,默认的时间属性;

ProcessingTime 处理时间;

IngestionTime 数据进入flink程序的时间;

Tumbling windows(滚动窗口)

滚动窗口下窗口之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。

tumb-window

下面示例以滚动时间窗口(TumblingEventTimeWindows)为例,默认模式是TimeCharacteristic.ProcessingTime处理时间

/** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

所以如果使用Event Time即数据的实际产生时间,需要通过senv.setStreamTimeCharacteristic指定

// 指定使用数据的实际时间 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<T> input = ...; // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 这里减去8小时,表示用UTC世界时间 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>); Sliding windows(滑动窗口)

滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

slide-window

同理,如果是滑动时间窗口,也是类似的:

// 窗口的大小是10s,每5s滑动一次,也就是5s计算一次 .timeWindow(Time.seconds(10), Time.seconds(5))

这里使用的是timeWindow,通常使用window,那么两者的区别是什么呢?

timeWindow其实判断时间的处理模式是ProcessingTime还是SlidingEventTimeWindows,帮我们判断好了,调用方法直接传入(Time size, Time slide)这两个参数就好了,如果是使用.window方法,则需要自己来判断,就是前者写法更简单一些。

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } } Count-Based window (基于计数的窗口)

Count Window 是根据元素个数对数据流进行分组的,也分滚动(tumb)和滑动(slide)。

Tumbling Count Window
当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。通过使用 DataStream API,我们可以这样实现:

// Stream of (userId, buyCnts) val buyCnts: DataStream[(Int, Int)] = ... val tumblingCnts: DataStream[(Int, Int)] = buyCnts // key stream by sensorId .keyBy(0) // tumbling count window of 100 elements size .countWindow(100) // compute the buyCnt sum .sum(1)

Sliding Count Window
当然Count Window 也支持 Sliding Window,虽在上图中未描述出来,但和Sliding Time Window含义是类似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例如下。

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // sliding count window of 100 elements size and 10 elements trigger interval .countWindow(100, 10) .sum(1) 会话(session)窗口

SessionWindow中的Gap是一个非常重要的概念,它指的是session之间的间隔。

如果session之间的间隔大于指定的间隔,数据将会被划分到不同的session中。比如,设定5秒的间隔,0-5属于一个session,5-10属于另一个session

session-window

DataStream<T> input = ...; // event-time session windows with static gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // event-time session windows with dynamic gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>); // processing-time session windows with static gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // processing-time session windows with dynamic gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>); Global Windows(全局窗口)

global-window

总结

SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows

基于时间的滑动窗口

SlidingEventTimeWindows

SlidingProcessingTimeWindows

基于时间的翻滚窗口

TumblingEventTimeWindows

TumblingProcessingTimeWindows

基于计数的滑动窗口

countWindow(100, 10)

基于计数的翻滚窗口

countWindow(100)

会话窗口
会话窗口:一条记录一个窗口

ProcessingTimeSessionWindows

EventTimeSessionWindows

全局窗口(GlobalWindows)

GlobalWindow是一个全局窗口,被实现为单例模式。其maxTimestamp被设置为Long.MAX_VALUE。

该类内部有一个静态类定义了GlobalWindow的序列化器:Serializer。

延迟

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpwzpd.html