[源码分析] 从源码入手看 Flink Watermark 之传播过程 (15)

前面提到,Task对象在执行过程中,把执行的任务交给了StreamTask这个类去执行。在我们的wordcount例子中,实际初始化的是OneInputStreamTask的对象。那么这个对象是如何执行用户的代码的呢?

它做的如下:

首先,初始化 initialize-operator-states()。

然后 open-operators() 方法。

最后调用 StreamTask#runMailboxLoop,便开始处理Source端消费的数据,并流入下游算子处理。

具体来说,就是把任务直接交给了InputProcessor去执行processInput方法。这是一个StreamInputProcessor的实例,该processor的任务就是处理输入的数据,包括用户数据、watermark和checkpoint数据等。

具体到OneInputStreamTask,OneInputStreamTask.inputProcessor 是 StreamOneInputProcessor 类型,它把input, output聚合在一起。input是StreamTaskNetworkInput类型。output是StreamTaskNetworkOutput类型。

具体代码如下

@Internal public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> { //这是OneInputStreamTask的init方法,从configs里面获取StreamOperator信息,生成自己的inputProcessor。 @Override public void init() throws Exception { StreamConfig configuration = getConfiguration(); int numberOfInputs = configuration.getNumberOfInputs(); if (numberOfInputs > 0) { CheckpointedInputGate inputGate = createCheckpointedInputGate(); DataOutput<IN> output = createDataOutput(); //  这里生成了 StreamTaskNetworkOutput StreamTaskInput<IN> input = createTaskInput(inputGate, output); inputProcessor = new StreamOneInputProcessor<>( // 这里把input, output通过Processor配置到了一起。 input, output, operatorChain); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); } private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate, DataOutput<IN> output) { int numberOfInputChannels = inputGate.getNumberOfInputChannels(); StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels, output); TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); return new StreamTaskNetworkInput<>( inputGate, inSerializer, getEnvironment().getIOManager(), statusWatermarkValve, 0); } /** * The network data output implementation used for processing stream elements * from {@link StreamTaskNetworkInput} in one input processor. */ private static class StreamTaskNetworkOutput<IN> extends AbstractDataOutput<IN> { private final OneInputStreamOperator<IN, ?> operator; private final WatermarkGauge watermarkGauge; private final Counter numRecordsIn; private StreamTaskNetworkOutput( OneInputStreamOperator<IN, ?> operator, // 这个就是注册的Operator StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge watermarkGauge, Counter numRecordsIn) { super(streamStatusMaintainer); this.operator = checkNotNull(operator); this.watermarkGauge = checkNotNull(watermarkGauge); this.numRecordsIn = checkNotNull(numRecordsIn); } @Override public void emitRecord(StreamRecord<IN> record) throws Exception { numRecordsIn.inc(); operator.setKeyContextElement1(record); operator.processElement(record); } @Override public void emitWatermark(Watermark watermark) throws Exception { watermarkGauge.setCurrentWatermark(watermark.getTimestamp()); operator.processWatermark(watermark); // 这里就进入了processWatermark具体处理,比如WindowOperator的 } @Override public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception { operator.processLatencyMarker(latencyMarker); } } } @Internal public interface StreamInputProcessor extends AvailabilityProvider, Closeable { InputStatus processInput() throws Exception; } @Internal public final class StreamOneInputProcessor<IN> implements StreamInputProcessor { @Override public InputStatus processInput() throws Exception { InputStatus status = input.emitNext(output); // 这里是开始从输入source读取一个record。input, output分别是 StreamTaskNetworkInput,StreamTaskNetworkOutput。 if (status == InputStatus.END_OF_INPUT) { operatorChain.endHeadOperatorInput(1); } return status; } } @Internal public final class StreamTaskNetworkInput<T> implements StreamTaskInput<T> { @Override public InputStatus emitNext(DataOutput<T> output) throws Exception { while (true) { // get the stream element from the deserializer if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { processElement(deserializationDelegate.getInstance(), output); //具体处理record return InputStatus.MORE_AVAILABLE; } } Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext(); if (bufferOrEvent.isPresent()) { processBufferOrEvent(bufferOrEvent.get()); } else { if (checkpointedInputGate.isFinished()) { checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available"); if (!checkpointedInputGate.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return InputStatus.END_OF_INPUT; } return InputStatus.NOTHING_AVAILABLE; } } } // 根据record类型,来处理record还是watermark private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception { if (recordOrMark.isRecord()){ output.emitRecord(recordOrMark.asRecord()); // 调用 StreamTaskNetworkOutput,最终调用到operator.processElement(record); } else if (recordOrMark.isWatermark()) { statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel); } else if (recordOrMark.isLatencyMarker()) { output.emitLatencyMarker(recordOrMark.asLatencyMarker()); } else if (recordOrMark.isStreamStatus()) { statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel); } else { throw new UnsupportedOperationException("Unknown type of StreamElement"); } } } @PublicEvolving public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable { protected transient InternalTimeServiceManager<?> timeServiceManager; public void processWatermark(Watermark mark) throws Exception { if (timeServiceManager != null) { timeServiceManager.advanceWatermark(mark); } output.emitWatermark(mark); } } @Internal public class InternalTimeServiceManager<K> { private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices; public void advanceWatermark(Watermark watermark) throws Exception { for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } } } public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> { private final ProcessingTimeService processingTimeService; private final KeyContext keyContext; private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue; private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue; private final KeyGroupRange localKeyGroupRange; private final int localKeyGroupRangeStartIdx; public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } } }

上面的代码中,StreamTaskNetworkOutput.emitRecord中的operator.processElement(record);才是真正处理用户逻辑的代码。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html