Spark RDD深度解析-RDD计算流程 (3)

其中splitRDD的分区,firstParent是父RDD;最外层的f其实是构造MapPartitionsRDD时传入的一个参数,改参数是一个函数对象,接收三个参数并返回Iterator

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)

f是何时生成的呢?就看何时生成的MapPartitionsRDD,参考上文可知MapPartitionsRDD是在map方法里构造的第二个构造参数就是f的具体实现。

new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

综上可知,MapPartitionsRDDcomputef的作用就是就是对f的第三个参数iter执行iter.map(cleanF),其中cleanF就是用户调用map时传入的函数,而iter又firstParent[T].iterator(split, context)的返回值。

firstParent[T].iterator(split, context)又是什么呢?他是对父RDD执行iterator方法,该方法是RDD接口的final方法,因此所有子RDD调用的都是该方法。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

通过进一步查看可知,iterator先判断 RDD storageLevel 是否为 NONE,若不是,则尝试从缓存中读取,读取不到则通过计算来获取该 Partition 对应的数据的迭代器;若是,尝试从 checkpoint 中获取 Partition 对应数据的迭代器,若 checkpoint 不存在则通过计算来获取。

Iterator方法将返回一个迭代器,通过迭代器可以访问父RDD个分区的每个元素,如果内存中不存在父RDD的数据,则调用父RDDcompute方法进行计算。

RDD真正的计算由RDDaction 操作触发,对于action 操作之前的所有Transformation 操作,Spark只记录Transformation的RDD生成轨迹,即各个RDD之间的相互依赖关系。

 

总结

Spark RDD的计算方式为:spark是从最后一个RDD开始计算(调用compute),计算时寻找父RDD,若父RDD在内存就直接使用,否则调用父RDDcompute计算得出,以此递归,过程可抽象为下图:

Spark RDD深度解析-RDD计算流程

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzfwj.html