Spark RDD深度解析-RDD计算流程
摘要 RDD(Resilient Distributed Datasets)是Spark的核心数据结构,所有数据计算操作均基于该结构进行,包括Spark sql 、Spark Streaming。理解RDD有助于了解分布式计算引擎的基本架构,更好地使用Spark进行批处理与流计算。本文以Spark2.0源代码为主,对RDD的生成、计算流程、加载顺序等作深入的解析。
RDD印象直观上,RDD可理解为下图所示结构,即RDD包含多个Partition(分区),每个Partition代表一部分数据并位于一个计算节点。
RDD本质上是Spark中的一个抽象类,所有子RDD(HadoopRDD、MapPartitionRDD、JdbcRDD等)都要继承并实现其中的方法。
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
RDD包含以下成员方法或属性:
1、compute方法
提供在计算过程中Partition元素的获取与计算方式
2、partition的列表
每一个partition代表一个并行的最小划分单元;
3、dependencies列表
描述RDD依赖哪些父RDD生成,即RDD的血缘关系;
4、partition的位置列表
定义如何最快速的获取partition的数据,加快计算,这个是可选的,可作为本地化计算的优化选项;
5、partitioner方法
定义如何对数据进行分区。
RDD生成方式1、scala集合
Partition的默认值:defaultParallelism
defaultParallelism与spark的部署模式相关:
Local 模式:本机 cpu cores 的数量
Mesos 模式:8
Yarn:max(2, 所有 executors 的 cpu cores 个数总和)
2、物理数据载入
默认为min(defaultParallelism, 2)
3、其他RDD转换
根据具体的转换算子而定
PartitionPartiton不直接持有数据,仅仅代表了分区的位置(index的值)。
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
从名字可以猜想,他描述了RDD之间的依赖关系。成员rdd就是父RDD,会在构造RDD时被赋值。
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
由上述RDD、Dependcy关系可画出下图,通过这种方式,子RDD能轻易找到父RDD的位置等信息,从而构建出RDD的转换路径,为DAGScheduler的任务划分及任务执行时寻找依赖的数据提供依据。
到此应该能大致明白RDD中涉及的各个概念的含义及其之间的联系。但是仔细思考,会发现存在很多问题,比如:
既然RDD不携带数据,那么数据是何时加载的?怎么加载的?怎么分布到不同计算节点的?
不同类型的RDD是怎么完成转换的?
RDD计算流程
以下面几行代码为例,解答上述问题。
var sc = new SparkContext();
var hdfs_rdd = sc.textFile(hdfs://master:9000/examples/people.txt); // 加载数据
var rdd = hdfs_rdd.map(_.split(“,”)); // 对每行数据按逗号分隔
print(rdd.count()); // 打印数据的条数
RDD的转换