StatusWatermarkValve就是用来处理watermark的。
@Internal public class StatusWatermarkValve { private final DataOutput output; public void inputWatermark(Watermark watermark, int channelIndex) throws Exception { // ignore the input watermark if its input channel, or all input channels are idle (i.e. overall the valve is idle). if (lastOutputStreamStatus.isActive() && channelStatuses[channelIndex].streamStatus.isActive()) { long watermarkMillis = watermark.getTimestamp(); // if the input watermark's value is less than the last received watermark for its input channel, ignore it also. if (watermarkMillis > channelStatuses[channelIndex].watermark) { channelStatuses[channelIndex].watermark = watermarkMillis; // previously unaligned input channels are now aligned if its watermark has caught up if (!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis >= lastOutputWatermark) { channelStatuses[channelIndex].isWatermarkAligned = true; } // now, attempt to find a new min watermark across all aligned channels findAndOutputNewMinWatermarkAcrossAlignedChannels(); } } } private void findAndOutputNewMinWatermarkAcrossAlignedChannels() throws Exception { long newMinWatermark = Long.MAX_VALUE; boolean hasAlignedChannels = false; // determine new overall watermark by considering only watermark-aligned channels across all channels for (InputChannelStatus channelStatus : channelStatuses) { if (channelStatus.isWatermarkAligned) { hasAlignedChannels = true; newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); } } // we acknowledge and output the new overall watermark if it really is aggregated // from some remaining aligned channel, and is also larger than the last output watermark if (hasAlignedChannels && newMinWatermark > lastOutputWatermark) { lastOutputWatermark = newMinWatermark; output.emitWatermark(new Watermark(lastOutputWatermark)); // 这里会最终emit watermark } } } 6. Watermarks的生成而Watermark的产生是在Apache Flink的Source节点 或 Watermark生成器计算产生(如Apache Flink内置的 Periodic Watermark实现)
There are two ways to assign timestamps and generate Watermarks:
Directly in the data stream source 自定义数据源设置 Timestamp/Watermark
Via a TimestampAssigner / WatermarkGenerator 在数据流中设置 Timestamp/Watermark。
自定义数据源设置 Timestamp/Watermark