Operator Chain是指在生成JobGraph阶段,将Job中的Operators按照一定策略(例如:single output operator可以chain在一起)链接起来并放置在一个Task线程中执行。减少了数据传递/线程切换等环节,降低系统开销的同时增加了资源利用率和Job性能。
chained operators实际上是从下游往上游去反向一个个创建和setup的。假设chained operators为:StreamGroupedReduce - StreamFilter - StreamSink,而实际初始化顺序则相反:StreamSink - StreamFilter - StreamGroupedReduce。
* OperatorChain( * StreamTask<OUT, OP> containingTask, * RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) * {@code * -- collect * | * +----> pushToOperator(StreamRecord<X> record) * +---------> operator.processElement(castRecord); * //这里的operator是chainedOperator,即除了headOperator之外,剩余的operators的chain。 * //这个operator.processElement,会循环调用operator chain所有operator,直到chain end。 * //比如 Operator A 对应的 ChainingOutput collect 调用了对应的算子 A 的 processElement 方法,这里又会调用 B 的 ChainingOutput 的 collect 方法,以此类推。这样便实现了可 chain 算子的本地处理,最终经由网络输出 RecordWriterOutput 发送到下游节点。 5. StreamOperatorStreamTask会调用Operator,所以我们需要看看Operator的生命周期。
逻辑算子Transformation最后会对应到物理算子Operator,这个概念对应的就是StreamOperator。
StreamOperator是根接口。对于 Streaming 来说所有的算子都继承自 StreamOperator。继承了StreamOperator的扩展接口则有OneInputStreamOperator,TwoInputStreamOperator。实现了StreamOperator的抽象类有AbstractStreamOperator以及它的子类AbstractStreamUdfOperator。
其中operator处理输入的数据(elements)可以是以下之一:input element,watermark和checkpoint barriers。他们中的每一个都有一个特殊的单元来处理。element由processElement()方法处理,watermark由processWatermark()处理,checkpoint barriers由异步调用的snapshotState()方法处理,此方法会触发一次checkpoint 。
processElement()方法也是UDF的逻辑被调用的地方,例如MapFunction里的map()方法。
* AbstractUdfStreamOperator, which is the basic class for all operators that execute UDFs. * * // initialization phase * //初始化operator-specific方法,如RuntimeContext和metric collection * OPERATOR::setup * UDF::setRuntimeContext * //setup的调用链是invoke(StreamTask) -> constructor(OperatorChain) -> setup * //调用setup时,StreamTask已经在各个TaskManager节点上 * //给出一个用来初始state的operator * * OPERATOR::initializeState * //执行所有operator-specific的初始化 * OPERATOR::open * UDF::open * * // processing phase (called on every element/watermark) * OPERATOR::processElement * UDF::run //给定一个operator可以有一个用户定义的函数(UDF) * OPERATOR::processWatermark * * // checkpointing phase (called asynchronously on every checkpoint) * OPERATOR::snapshotState * * // termination phase * OPERATOR::close * UDF::close * OPERATOR::dispose