承接输入数据并进行处理的算子就是OneInputStreamOperator、TwoInputStreamOperator等。 这两个接口非常类似,本质上就是处理流上存在的三种元素StreamRecord,Watermark和LatencyMarker。一个用作单流输入,一个用作双流输入。除了StreamSource以外的所有Stream算子都必须实现并且只能实现其中一个接口。
@PublicEvolving public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> { void processElement(StreamRecord<IN> element) throws Exception; void processWatermark(Watermark mark) throws Exception; void processLatencyMarker(LatencyMarker latencyMarker) throws Exception; } StreamMap & StreamFlatMapmap,filter等常用操作都是OneInputStreamOperator。下面给出StreamMap,StreamFlatMap作为具体例子。
// 用StreamMap里做个实际算子的例子@ Internal public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; public StreamMap(MapFunction<IN, OUT> mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } } // 用StreamFlatMap里做个实际算子的例子 @Internal public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private transient TimestampedCollector<OUT> collector; public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) { super(flatMapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector); } } 生成StreamGraph程序执行即env.execute("Java WordCount from SocketTextStream Example")这行代码的时候,就会生成StreamGraph。代表程序的拓扑结构,是从用户代码直接生成的图。
StreamGraph生成函数分析实际生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations) 。其中的transformations是一个list,里面记录的就是我们在transform方法中放进来的算子。最终会调用 transformXXX 来对具体的Transformation进行转换。
@Internal public class StreamGraphGenerator { private final List<Transformation<?>> transformations; private StreamGraph streamGraph; public StreamGraph generate() { //注意,StreamGraph的生成是从sink开始的 streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings); for (Transformation<?> transformation: transformations) { transform(transformation); } final StreamGraph builtStreamGraph = streamGraph; return builtStreamGraph; } private Collection<Integer> transform(Transformation<?> transform) { //这个方法的核心逻辑就是判断传入的steamOperator是哪种类型,并执行相应的操作,详情见下面那一大堆if-else //这里对操作符的类型进行判断,并以此调用相应的处理逻辑.简而言之,处理的核心无非是递归的将该节点和节点的上游节点加入图 Collection<Integer> transformedIds; if (transform instanceof OneInputTransformation<?, ?>) { transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform); } else if (transform instanceof TwoInputTransformation<?, ?, ?>) { transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform); } ....... } //因为map,filter等常用操作都是OneInputStreamOperator,我们就来看看StreamGraphGenerator.transformOneInputTransform((OneInputTransformation<?, ?>) transform)方法。 //该函数首先会对该transform的上游transform进行递归转换,确保上游的都已经完成了转化。然后通过transform构造出StreamNode,最后与上游的transform进行连接,构造出StreamNode。 private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) { //就是递归处理节点,为当前节点和它的依赖节点建立边,处理边之类的,把节点加到图里。 Collection<Integer> inputIds = transform(transform.getInput()); String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds); // 这里添加Operator到streamGraph上。 streamGraph.addOperator(transform.getId(), slotSharingGroup, transform.getCoLocationGroupKey(), transform.getOperatorFactory(), transform.getInputType(), transform.getOutputType(), transform.getName()); if (transform.getStateKeySelector() != null) { TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig); streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer); } int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? transform.getParallelism() : executionConfig.getParallelism(); streamGraph.setParallelism(transform.getId(), parallelism); streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism()); for (Integer inputId: inputIds) { streamGraph.addEdge(inputId, transform.getId(), 0); } return Collections.singleton(transform.getId()); } } streamGraph.addOperator在之前的生成图代码中,有streamGraph.addOperator,我们具体看看实现。