每一条记录来了以后会根据时间属性值采用不同的window assinger 方法分配给一个或者多个窗口,分为滚动窗口(Tumbling windows)和滑动窗口(Sliding windows)。
EventTime 数据本身携带的时间,默认的时间属性;
ProcessingTime 处理时间;
IngestionTime 数据进入flink程序的时间;
Tumbling windows(滚动窗口)滚动窗口下窗口之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。
下面示例以滚动时间窗口(TumblingEventTimeWindows)为例,默认模式是TimeCharacteristic.ProcessingTime处理时间
/** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;所以如果使用Event Time即数据的实际产生时间,需要通过senv.setStreamTimeCharacteristic指定
// 指定使用数据的实际时间 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<T> input = ...; // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 这里减去8小时,表示用UTC世界时间 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>); Sliding windows(滑动窗口)滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。
同理,如果是滑动时间窗口,也是类似的:
// 窗口的大小是10s,每5s滑动一次,也就是5s计算一次 .timeWindow(Time.seconds(10), Time.seconds(5))这里使用的是timeWindow,通常使用window,那么两者的区别是什么呢?
timeWindow其实判断时间的处理模式是ProcessingTime还是SlidingEventTimeWindows,帮我们判断好了,调用方法直接传入(Time size, Time slide)这两个参数就好了,如果是使用.window方法,则需要自己来判断,就是前者写法更简单一些。
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } } Count-Based window (基于计数的窗口)Count Window 是根据元素个数对数据流进行分组的,也分滚动(tumb)和滑动(slide)。
Tumbling Count Window
当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。通过使用 DataStream API,我们可以这样实现:
Sliding Count Window
当然Count Window 也支持 Sliding Window,虽在上图中未描述出来,但和Sliding Time Window含义是类似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例如下。
SessionWindow中的Gap是一个非常重要的概念,它指的是session之间的间隔。
如果session之间的间隔大于指定的间隔,数据将会被划分到不同的session中。比如,设定5秒的间隔,0-5属于一个session,5-10属于另一个session
DataStream<T> input = ...; // event-time session windows with static gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // event-time session windows with dynamic gap input .keyBy(<key selector>) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>); // processing-time session windows with static gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>); // processing-time session windows with dynamic gap input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) .<windowed transformation>(<window function>); Global Windows(全局窗口) 总结SlidingEventTimeWindows,
SlidingProcessingTimeWindows,
TumblingEventTimeWindows,
TumblingProcessingTimeWindows
基于时间的滑动窗口
SlidingEventTimeWindows
SlidingProcessingTimeWindows
基于时间的翻滚窗口
TumblingEventTimeWindows
TumblingProcessingTimeWindows
基于计数的滑动窗口
countWindow(100, 10)
基于计数的翻滚窗口
countWindow(100)
会话窗口
会话窗口:一条记录一个窗口
ProcessingTimeSessionWindows
EventTimeSessionWindows
全局窗口(GlobalWindows)
GlobalWindow是一个全局窗口,被实现为单例模式。其maxTimestamp被设置为Long.MAX_VALUE。
该类内部有一个静态类定义了GlobalWindow的序列化器:Serializer。
延迟