Spark RDD深度解析-RDD计算流程 (2)

首先从直观上了解上述代码执行过程中RDD的转换,如下图,Spark按照HDFS中文件的block将数据加载到内存,成为初始RDD1,经过每一步操作后转换为相应RDD

Spark RDD深度解析-RDD计算流程

 

首先分析textFile方法的作用,源码如下:

 

def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

着重看红色语句,textFile方法实际上是调用了hadoopFile方法,再利用其返回值调用map方法,HadoopFile执行了什么,返回了什么呢?

def hadoopFile[K, V](
    path: String,
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()

  // This is a hack to enforce loading hdfs-site.xml.
  // See SPARK-11227 for details.
  FileSystem.getLocal(hadoopConfiguration)

  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
  val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
  val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
  new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
}

很明显,hadoopFile实际上是获取了HADOOP的配置,然后构造并返回了HadoopRDD对象,HadoopRDDRDD的子类。因此textFile最后调用的是HadoopRDD对象的map方法,其实RDD接口中定义并实现了map方法,所有继承了RDD的类调用的map方法都来自于此。

观察RDDmap方法:

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

map方法很简单,首先包装一下传进来的函数,然后返回MapPartitionsRDD对象。至此,textFile结束,他最终只是返回了MapPartitionsRDD,并没有执行数据读取、计算操作。

 

 接着看下一语句:var rdd = hdfs_rdd.map(_.split(“,”));

由上面的分析可知hdfs_rdd是一个MapPartitionsRDD对象,于是其map方法内容与上文的一模一样,也只是返回一个包含用户函数的MapPartitionsRDD对象。

目前为止每个方法的调用只是返回不同类型的RDD对象,还未真正执行计算。

 

 

 

 

接着var cnt = rdd.count();

count是一种action类型的操作,会触发RDD的计算,为什么说count会触发RDD的计算呢?需要看看count的实现:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

可以看到,count方法中调用了scsparkContext)的runJob方法,该操作将触发DagScheduler去分解任务并提交到集群执行。count方法会返回Array[U]类型的结果,数组中每个值代表了当前RDD每个分区中包含的元素个数,因此sum的结果就是RDD中所有元素的个数,本例的结果就是HDFS文件中存在几行数据。

 

RDD的计算  

下面介绍任务提交后RDD是怎么计算出来的。

任务分解并提交后开始执行,task最后一个RDD上执行compute方法

以上述代码为例,最后一个RDD的类型是MapPartitionsRDD,看其compute方法:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzfwj.html