这只是定义在RDD抽象父类中的默认方法,不同的子类会有不同的实现。
它在如下类中又重新实现了这个方法,如下:
是否是shuffle依赖,跟分区的数量也有一定的关系,具体可以看下面的几个RDD的依赖的实现:
CoGroupedRDD SubtractedRDD DAG在Spark作业中的重要性如下图,一个application的执行过程被划分为四个阶段:
阶段一:我们编写driver程序,定义RDD的action和transformation操作。这些依赖关系形成操作的DAG。
阶段二:根据形成的DAG,DAGScheduler将其划分为不同的stage。
阶段三:每一个stage中有一个TaskSet,DAGScheduler将TaskSet交给TaskScheduler去执行,TaskScheduler将任务执行完毕之后结果返回给DAGSCheduler。
阶段四:TaskScheduler将任务分发到每一个Worker节点去执行,并将结果返回给TaskScheduler。
本篇文章的定位就是阶段一和阶段二。后面会介绍阶段三和阶段四。
注:图片不知出处。
DAG的创建我们先来分析一个top N案例。
一个真实的TopN案例需求:一个大文件里有很多的重复整数,现在求出重复次数最多的前10个数。
代码如下(为了多几个stage,特意加了几个repartition):
scala> val sourceRdd = sc.textFile("/tmp/hive/hive/result",10).repartition(5)
sourceRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at repartition at <console>:27
scala> val allTopNs = sourceRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_).repartition(10).sortByKey(ascending = true, 100).map(tup => (tup._2, tup._1)).mapPartitions(
|
iter => {
|
iter.toList.sortBy(tup => tup._1).takeRight(100).iterator
|
}
|
).collect()
// 结果略
scala> val finalTopN = scala.collection.SortedMap.empty[Int, String].++(allTopNs)
//结果略
scala> finalTopN.takeRight(10).foreach(tup => {println(tup._2 + " occurs times : " + tup._1)})
53 occurs times : 1070
147 occurs times : 1072
567 occurs times : 1073
931 occurs times : 1075
267 occurs times : 1077
768 occurs times : 1080
612 occurs times : 1081
877 occurs times : 1082
459 occurs times : 1084
514 occurs times : 1087