最后是处理 Watermark 的简要流程(OneInputStreamTask为例)
* -- OneInputStreamTask.invoke() * | * +----> StreamTask.init * | 把StreamTaskNetworkOutput/StreamTaskNetworkInput聚合StreamOneInputProcessor * +----> StreamTask.runMailboxLoop * | 从 StreamTask.runMailboxLoop 开始,下面是一层层的调用关系 * -----> StreamTask.processInput() * -----> StreamTask.inputProcessor.processInput() * -----> StreamOneInputProcessor.processInput * -----> input.emitNext(output) * -----> StreamTaskNetworkInput.emitNext() * -----> StreamTaskNetworkInput.processElement() * 下面是处理普通 Record * -- StreamTaskNetworkInput.processElement() * | * | 下面都是一层层的调用关系 * -----> output.emitRecord(recordOrMark.asRecord()) * -----> StreamTaskNetworkOutput.emitRecord() * -----> operator.processElement(record) * 进入具体算子 processElement 的处理,比如StreamFlatMap.processElement * -----> StreamFlatMap.processElement(record) * -----> userFunction.flatMap() * -- 下面是处理 Watermark * -- StreamTaskNetworkInput.processElement() * | * | 下面都是一层层的调用关系 * -----> StatusWatermarkValve.inputWatermark() * -----> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels() * -----> output.emitWatermark() * -----> StreamTaskNetworkOutput.emitWatermark() * -----> operator.processWatermark(watermark) * -----> KeyedProcessOperator.processWatermark(watermark) * 具体算子processWatermark处理,如WindowOperator/KeyedProcessOperator.processWatermark * 最终进入基类AbstractStreamOperator.processWatermark * -----> AbstractStreamOperator.processWatermark(watermark) * -----> timeServiceManager.advanceWatermark(mark); 第一步处理watermark * output.emitWatermark(mark) 第二步将watermark发送到下游 * -----> InternalTimeServiceManager.advanceWatermark * -----> 下面看看第一步处理watermark * -----> InternalTimerServiceImpl.advanceWatermark * 逻辑timer时间小于watermark的都应该被触发回调。从eventTimeTimersQueue从小到大取timer,如果小于传入的water mark,那么说明这个window需要触发。注意watermarker是没有key的,所以当一个watermark来的时候是会触发所有timer,而timer的key是不一定的,所以这里一定要设置keyContext,否则就乱了 * -----> triggerTarget.onEventTime(timer); * triggerTarget是具体operator对象,open时通过InternalTimeServiceManager.getInternalTimerService传递到HeapInternalTimerService * -----> KeyedProcessOperator.onEeventTime() * 调用用户实现的keyedProcessFunction.onTimer去做具体事情。对于window来说也是调用onEventTime或者onProcessTime来从key和window對應的状态中的数据发送到windowFunction中去计算并发送到下游节点 * -----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer); * -----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector); * -- DataStream 设置定时发送Watermark,是加了个chain的TimestampsAndPeriodicWatermarksOperator * -- StreamTaskNetworkInput.processElement() * -----> TimestampsAndPeriodicWatermarksOperator.processElement * 会调用AssignerWithPeriodicWatermarks.extractTimestamp提取event time * 然后更新StreamRecord的时间 * -----> WindowOperator.processElement * 在windowAssigner.assignWindows时以element的timestamp作为assign时间 0x07 处理 Watermark 的详细流程(源码分析)下面代码分析略冗长。
我们再看看样例代码
DataStream<String> text = env.socketTextStream(hostname, port); DataStream counts = text .filter(new FilterClass()) .map(new LineSplitter()) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator) .keyBy(0) .timeWindow(Time.seconds(10)) .sum(2) counts.print() System.out.println(env.getExecutionPlan()); 1. 程序逻辑 DataStream & Transformation首先看看逻辑API。
DataStream是数据流概念。A DataStream represents a stream of elements of the same type。
Transformation是一个逻辑API概念。Transformation代表了流的转换,将一个或多个DataStream转换为新的DataStream。A Transformation is applied on one or more data streams or data sets and results in one or more output data streams or data sets。
我们认为Transformation就是逻辑算子,而 Transformation 对应的物理概念是Operators。
DataStream类在内部组合了一个 Transformation类,实际的转换操作均通过该类完成,描述了这个DataStream是怎么来的。