自定义的数据源类需要继承并实现 SourceFunction[T] 接口,其中 run 方法是定义数据生产的地方:
//自定义的数据源为自定义类型MyType class MySource extends SourceFunction[MyType]{ //重写run方法,定义数据生产的逻辑 override def run(ctx: SourceContext[MyType]): Unit = { while (/* condition */) { val next: MyType = getNext() //设置timestamp从MyType的哪个字段获取(eventTimestamp) ctx.collectWithTimestamp(next, next.eventTimestamp) if (next.hasWatermarkTime) { //设置watermark从MyType的那个方法获取(getWatermarkTime) ctx.emitWatermark(new Watermark(next.getWatermarkTime)) } } } } 在数据流中设置 Timestamp/Watermark在数据流中,可以设置 stream 的 Timestamp Assigner ,该 Assigner 将会接收一个 stream,并生产一个带 Timestamp和Watermark 的新 stream。
Flink通过水位线分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事件流中注入水位线。元素在streaming dataflow引擎中流动到WindowOperator时,会被分为两拨,分别是普通事件和水位线。
回到实例代码,assignTimestampsAndWatermarks 就是生成一个TimestampsAndPeriodicWatermarksOperator。
TimestampsAndPeriodicWatermarksOperator的具体处理 Watermark代码如下。其中processWatermark具体是阻断上游水位线,这样下游就只能用自身产生的水位线了。
public class TimestampsAndPeriodicWatermarksOperator<T> extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>> implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { private transient long watermarkInterval; private transient long currentWatermark; //可以看到在processElement会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time, 然后更新StreamRecord的时间。 @Override public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); } @Override public void onProcessingTime(long timestamp) throws Exception { // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); //定时调用用户自定义的getCurrentWatermark if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } @Override public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used // to signal the end of input and to not block watermark progress downstream if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } } } 7. WindowOperator的实现最后的 .keyBy(0) .timeWindow(Time.seconds(10)) 是由 WindowOperator处理。
Flink通过水位线分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator这两个算子)向事件流中注入水位线。元素在streaming dataflow引擎中流动到WindowOperator时,会被分为两拨,分别是普通事件和水位线。
如果是普通的事件,则会调用processElement方法进行处理,在processElement方法中,首先会利用窗口分配器为当前接收到的元素分配窗口,接着会调用触发器的onElement方法进行逐元素触发。对于时间相关的触发器,通常会注册事件时间或者处理时间定时器,这些定时器会被存储在WindowOperator的处理时间定时器队列和水位线定时器队列中,如果触发的结果是FIRE,则对窗口进行计算。