[源码分析] 从源码入手看 Flink Watermark 之传播过程 (12)

这里重要的是把 SourceStreamTask / OneInputStreamTask 添加到StreamNode上,作为 jobVertexClass。

@Internal public class StreamGraph implements Pipeline { public <IN, OUT> void addOperator( Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperatorFactory<OUT> operatorFactory, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) { // 这里添加了 OneInputStreamTask/SourceStreamTask,这个是日后真实运行的地方。 if (operatorFactory.isStreamSource()) { addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName); } else { addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName); } } protected StreamNode addNode(Integer vertexID, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, Class<? extends AbstractInvokable> vertexClass, // 这里是OneInputStreamTask... StreamOperatorFactory<?> operatorFactory, String operatorName) { StreamNode vertex = new StreamNode( vertexID, slotSharingGroup, coLocationGroup, operatorFactory, operatorName, new ArrayList<OutputSelector<?>>(), vertexClass); streamNodes.put(vertexID, vertex); return vertex; } } 关键类StreamNode @Internal public class StreamNode implements Serializable { private transient StreamOperatorFactory<?> operatorFactory; private List<OutputSelector<?>> outputSelectors; private List<StreamEdge> inEdges = new ArrayList<StreamEdge>(); private List<StreamEdge> outEdges = new ArrayList<StreamEdge>(); private final Class<? extends AbstractInvokable> jobVertexClass; // OneInputStreamTask @VisibleForTesting public StreamNode( Integer id, @Nullable String slotSharingGroup, @Nullable String coLocationGroup, StreamOperator<?> operator, String operatorName, List<OutputSelector<?>> outputSelector, Class<? extends AbstractInvokable> jobVertexClass) { this(id, slotSharingGroup, coLocationGroup, SimpleOperatorFactory.of(operator), operatorName, outputSelector, jobVertexClass); } public Class<? extends AbstractInvokable> getJobVertexClass() { return jobVertexClass; } } 3. Task之间数据交换机制

Flink中的数据交换构建在如下两条设计原则之上:

数据交换的控制流(例如,为实例化交换而进行的消息传输)是接收端初始化的,这非常像最初的MapReduce。

数据交换的数据流(例如,在网络上最终传输的数据)被抽象成一个叫做IntermediateResult的概念,它是可插拔的。这意味着系统基于相同的实现逻辑可以既支持流数据,又支持批处理数据的传输。

数据在task之间传输整体过程

第一步必然是准备一个ResultPartition;

通知JobMaster;

JobMaster通知下游节点;如果下游节点尚未部署,则部署之;

下游节点向上游请求数据

开始传输数据

数据在task之间具体传输

描述了数据从生产者传输到消费者的完整生命周期。

数据在task之间传递有如下几步:

数据在本operator处理完后,通过Collector收集,这些记录被传给RecordWriter对象。每条记录都要选择一个下游节点,所以要经过ChannelSelector。一个ChannelSelector选择一个或者多个序列化器来处理记录。如果记录在broadcast中,它们将被传递给每一个序列化器。如果记录是基于hash分区的,ChannelSelector将会计算记录的hash值,然后选择合适的序列化器。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzwfyx.html