spark 源码分析之十九 -- DAG的生成和Stage的划分 (2)

spark 源码分析之十九 -- DAG的生成和Stage的划分

这只是定义在RDD抽象父类中的默认方法,不同的子类会有不同的实现。

它在如下类中又重新实现了这个方法,如下:

spark 源码分析之十九 -- DAG的生成和Stage的划分

是否是shuffle依赖,跟分区的数量也有一定的关系,具体可以看下面的几个RDD的依赖的实现:

CoGroupedRDD

spark 源码分析之十九 -- DAG的生成和Stage的划分

 

SubtractedRDD

spark 源码分析之十九 -- DAG的生成和Stage的划分

DAG在Spark作业中的重要性

如下图,一个application的执行过程被划分为四个阶段:

阶段一:我们编写driver程序,定义RDD的action和transformation操作。这些依赖关系形成操作的DAG。

阶段二:根据形成的DAG,DAGScheduler将其划分为不同的stage。

阶段三:每一个stage中有一个TaskSet,DAGScheduler将TaskSet交给TaskScheduler去执行,TaskScheduler将任务执行完毕之后结果返回给DAGSCheduler。

阶段四:TaskScheduler将任务分发到每一个Worker节点去执行,并将结果返回给TaskScheduler。

 

本篇文章的定位就是阶段一和阶段二。后面会介绍阶段三和阶段四。

spark 源码分析之十九 -- DAG的生成和Stage的划分

注:图片不知出处。

DAG的创建

我们先来分析一个top N案例。

一个真实的TopN案例

需求:一个大文件里有很多的重复整数,现在求出重复次数最多的前10个数。

代码如下(为了多几个stage,特意加了几个repartition):

scala> val sourceRdd = sc.textFile("/tmp/hive/hive/result",10).repartition(5)
sourceRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at repartition at <console>:27

scala> val allTopNs = sourceRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_).repartition(10).sortByKey(ascending = true, 100).map(tup => (tup._2, tup._1)).mapPartitions(
| iter => {
| iter.toList.sortBy(tup => tup._1).takeRight(100).iterator
| }
| ).collect()

// 结果略
scala> val finalTopN = scala.collection.SortedMap.empty[Int, String].++(allTopNs)
//结果略

scala> finalTopN.takeRight(10).foreach(tup => {println(tup._2 + " occurs times : " + tup._1)})

53 occurs times : 1070
147 occurs times : 1072
567 occurs times : 1073
931 occurs times : 1075
267 occurs times : 1077
768 occurs times : 1080
612 occurs times : 1081
877 occurs times : 1082
459 occurs times : 1084
514 occurs times : 1087

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwwwzz.html