Spark 中所有的 transformations 都是 lazy(懒加载的), 因此它不会立刻计算出结果. 相反, 他们只记得应用于一些基本数据集的转换 (例如. 文件). 只有当需要返回结果给驱动程序时,transformations 才开始计算. 这种设计使 Spark 的运行更高效. 例如, 我们可以了解到,map 所创建的数据集将被用在 reduce 中,并且只有 reduce 的计算结果返回给驱动程序,而不是映射一个更大的数据集.
默认情况下,每次你在 RDD 运行一个 action 的时, 每个 transformed RDD 都会被重新计算。但是,您也可用 persist (或 cache) 方法将 RDD persist(持久化)到内存中;在这种情况下,Spark 为了下次查询时可以更快地访问,会把数据保存在集群上。此外,还支持持续持久化 RDDs 到磁盘,或复制到多个结点。
Jobs / Stage作业执行原理
作业(Job):RDD每一个行动操作都会生成一个或者多个调度阶段。
调度阶段(Stage):每个Job都会根据依赖关系,以Shuffle过程作为划分,分为Shuffle Map Stage和Result Stage。每个Stage包含多个任务集(TaskSet),TaskSet的数量与分区数相同。
任务(Task):分发到Executor上的工作任务,是Spark的最小执行单元。
DAGScheduler:DAGScheduler是面向调度阶段的任务调度器,负责划分调度阶段并提交给TaskScheduler。
TaskScheduler:TaskScheduler是面向任务的调度器,它负责将任务分发到Woker节点,由Executor进行执行。
每一次行动操作都会触发SparkContext的runJob方法进行作业的提交。
这些作业之间可以没有任何依赖关系,对于多个作业之间的调度,共有两种:一种是默认的FIFO模式,另一种则是FAIR模式,该模式的调度可以通过设定minShare(最小任务数)和weight(任务的权重)来决定Job执行的优先级。
FIFO调度策略:优先比较作业优先级(作业编号越小优先级越高),再比较调度阶段优先级(调度阶段编号越小优先级越高)。
FAIR调度策略:先获取两个调度的饥饿程度,是否处于饥饿状态由当前正在运行的任务是否小于最小任务决定,获取后进行如下比较:
优先满足处于饥饿状态的调度
同处于饥饿状态,优先满足资源比小的调度
同处于非饥饿状态,优先满足权重比小的调度
以上情况均相同的情况下,根据调度名称进行排序
划分调度阶段(DAG构建)DAG的构建:主要是通过对最后一个RDD进行递归,使用广度优先遍历每个RDD跟父RDD的依赖关系(前面提到子RDD会记录依赖关系),碰到ShuffleDependency的则进行切割。切割后形成TaskSet传递给TaskScheduler进行执行。
DAG的作用:让窄依赖的RDD操作合并为同一个TaskSet,将多个任务进行合并,有利于任务执行效率的提高。
TaskSet结构图:假设数据有两个Partition时,TaskSet是一组关联的,但相互之间没有Shuffle依赖关系的Task集合,TaskSet的ShuffleMapStage数量跟Partition个数相关,主要包含task的集合,stage中的rdd信息等等。Task会被序列化和压缩
Spark里的某些操作会触发shuffle。shuffle是Spark重新分配数据的一种机制,使得这些数据可以跨不同的区域进行分组,通常涉及在executors和机器之间拷贝数据,使得shuffle成为一个复杂的、代价高的操作。
实例说明为了明白 reduceByKey 操作的过程,我们以 reduceByKey 为例。
数据处理:文件在hdfs中以多个切片形式存储,读取时每一个切片会被分配给一个Excutor进行处理;
map端操作:map端对文件数据进行处理,格式化为(key,value)键值对,每个map都可能包含a,b,c,d等多个字母,如果在map端使用了combiner,则数据会被压缩,value值会被合并;(注意:这个过程的使用需要保证对最终结果没有影响,有利于减少shuffle过程的数据传输);
reduce端操作:reduce过程中,假设a和b,c和d在同一个reduce端,需要将map端被分配在同一个reduce端的数据进行洗牌合并,这个过程被称之为shuffle。