[源码分析] 从源码入手看 Flink Watermark 之传播过程 (13)

每个channel都有一个serializer,序列化器将record数据记录序列化成二进制的表示形式。然后将它们放到大小合适的buffer中(记录也可以被切割到多个buffer中)。

接下来数据被写入ResultPartition下的各个subPartition (ResultSubpartition - RS,用于为特定的消费者收集buffer数据)里,此时该数据已经存入DirectBuffer(MemorySegment)。既然首个buffer进来了,RS就对消费者变成可访问的状态了(注意,这个行为实现了一个streaming shuffle),然后它通知JobManager。

JobManager查找RS的消费者,然后通知TaskManager一个数据块已经可以访问了。通知TM2的消息会被发送到InputChannel,该inputchannel被认为是接收这个buffer的,接着通知RS2可以初始化一个网络传输了。然后,RS2通过TM1的网络栈请求该buffer,然后双方基于netty准备进行数据传输。网络连接是在TaskManager(而非特定的task)之间长时间存在的。

单独的线程控制数据的flush速度,一旦触发flush,则通过Netty的nio通道向对端写入。

对端的netty client接收到数据,decode出来,把数据拷贝到buffer里,然后通知InputChannel

一旦buffer被TM2接收,它会穿过一个类似的对象栈,起始于InputChannel(接收端 等价于IRPQ),进入InputGate(它包含多个IC),最终进入一个RecordDeserializer,它用于从buffer中还原成类型化的记录,然后将其传递给接收task。

有可用的数据时,下游算子从阻塞醒来。从InputChannel取出buffer,再解序列化成record,交给算子执行用户代码。

4. 数据源的逻辑——StreamSource与时间模型

SourceFunction是所有stream source的根接口。

StreamSource抽象了一个数据源,并且指定了一些如何处理数据的模式。StreamSource是用来开启整个流的算子。SourceFunction定义了两个接口方法:

run : 启动一个source,即对接一个外部数据源然后emit元素形成stream(大部分情况下会通过在该方法里运行一个while循环的形式来产生stream)。
cancel : 取消一个source,也即将run中的循环emit元素的行为终止。

@Public public interface SourceFunction<T> extends Function, Serializable { void run(SourceContext<T> ctx) throws Exception; void cancel(); @Public // Interface might be extended in the future with additional methods. //SourceContex则是用来进行数据发送的接口。 interface SourceContext<T> { void collect(T element); @PublicEvolving void collectWithTimestamp(T element, long timestamp); @PublicEvolving void emitWatermark(Watermark mark); @PublicEvolving void markAsTemporarilyIdle(); Object getCheckpointLock(); void close(); } } public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { //读到数据后,把数据交给collect方法,collect方法负责把数据交到合适的位置(如发布为br变量,或者交给下个operator,或者通过网络发出去) private transient SourceFunction.SourceContext<OUT> ctx; private transient volatile boolean canceledOrStopped = false; private transient volatile boolean hasSentMaxWatermark = false; public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector, final OperatorChain<?, ?> operatorChain) throws Exception { userFunction.run(ctx); } } SocketTextStreamFunction

回到实例代码,env.socketTextStream(hostname, port)就是生成了SocketTextStreamFunction。

run方法的逻辑如上,逻辑很清晰,就是从指定的hostname和port持续不断的读取数据,按行分隔符划分成一个个字符串,然后转发到下游。

cancel方法的实现如下,就是将运行状态的标识isRunning属性设置为false,并根据需要关闭当前socket。

@PublicEvolving public class SocketTextStreamFunction implements SourceFunction<String> { private final String hostname; private final int port; private final String delimiter; private final long maxNumRetries; private final long delayBetweenRetries; private transient Socket currentSocket; private volatile boolean isRunning = true; public SocketTextStreamFunction(String hostname, int port, String delimiter, long maxNumRetries) { this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); } public void run(SourceContext<String> ctx) throws Exception { final StringBuilder buffer = new StringBuilder(); long attempt = 0; /** 这里是第一层循环,只要当前处于运行状态,该循环就不会退出,会一直循环 */ while (isRunning) { try (Socket socket = new Socket()) { /** 对指定的hostname和port,建立Socket连接,并构建一个BufferedReader,用来从Socket中读取数据 */ currentSocket = socket; LOG.info("Connecting to server socket " + hostname + ':' + port); socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); char[] cbuf = new char[8192]; int bytesRead; /** 这里是第二层循环,对运行状态进行了双重校验,同时对从Socket中读取的字节数进行判断 */ while (isRunning && (bytesRead = reader.read(cbuf)) != -1) { buffer.append(cbuf, 0, bytesRead); int delimPos; /** 这里是第三层循环,就是对从Socket中读取到的数据,按行分隔符进行分割,并将每行数据作为一个整体字符串向下游转发 */ while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) { String record = buffer.substring(0, delimPos); if (delimiter.equals("\n") && record.endsWith("\r")) { record = record.substring(0, record.length() - 1); } /** 用入参ctx,进行数据的转发 */ ctx.collect(record); buffer.delete(0, delimPos + delimiter.length()); } } } /** 如果由于遇到EOF字符,导致从循环中退出,则根据运行状态,以及设置的最大重试尝试次数,决定是否进行 sleep and retry,或者直接退出循环 */ if (isRunning) { attempt++; if (maxNumRetries == -1 || attempt < maxNumRetries) { LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs..."); Thread.sleep(delayBetweenRetries); } else { break; } } } /** 在最外层的循环都退出后,最后检查下缓存中是否还有数据,如果有,则向下游转发 */ if (buffer.length() > 0) { ctx.collect(buffer.toString()); } } public void cancel() { isRunning = false; Socket theSocket = this.currentSocket; /** 如果当前socket不为null,则进行关闭操作 */ if (theSocket != null) { IOUtils.closeSocket(theSocket); } } } 5. StreamTask

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html