其中split是RDD的分区,firstParent是父RDD;最外层的f其实是构造MapPartitionsRDD时传入的一个参数,改参数是一个函数对象,接收三个参数并返回Iterator
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
f是何时生成的呢?就看何时生成的MapPartitionsRDD,参考上文可知MapPartitionsRDD是在map方法里构造的,其第二个构造参数就是f的具体实现。
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
综上可知,MapPartitionsRDD的compute中f的作用就是就是对f的第三个参数iter执行iter.map(cleanF),其中cleanF就是用户调用map时传入的函数,而iter又是firstParent[T].iterator(split, context)的返回值。
firstParent[T].iterator(split, context)又是什么呢?他是对父RDD执行iterator方法,该方法是RDD接口的final方法,因此所有子RDD调用的都是该方法。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
通过进一步查看可知,iterator先判断 RDD 的 storageLevel 是否为 NONE,若不是,则尝试从缓存中读取,读取不到则通过计算来获取该 Partition 对应的数据的迭代器;若是,尝试从 checkpoint 中获取 Partition 对应数据的迭代器,若 checkpoint 不存在则通过计算来获取。
Iterator方法将返回一个迭代器,通过迭代器可以访问父RDD的某个分区的每个元素,如果内存中不存在父RDD的数据,则调用父RDD的compute方法进行计算。
RDD真正的计算由RDD的action 操作触发,对于action 操作之前的所有Transformation 操作,Spark只记录Transformation的RDD生成轨迹,即各个RDD之间的相互依赖关系。
总结
Spark RDD的计算方式为:spark是从最后一个RDD开始计算(调用compute),计算时寻找父RDD,若父RDD在内存就直接使用,否则调用父RDD的compute计算得出,以此递归,过程可抽象为下图: