[下图是 SparkShell 作业提交后打印的日志信息]
我们得出的结论是 DAGScheduler 划分好 Stage 之后会通过 TaskSchedulerImpl 中的 TaskSetManager 来管理当前要运行 Stage 中所有的任务 TaskSet,它是一个包含了高层调度器与底层调度器的一个集合。TaskSet 的第一个成员是一个数组;第二个成员表示自己属于那一个 Stage,第三个成员是 StageAttemptId,第四个是优先值,调度时底层有一个调度池,这个调度池会规定每个 Stage 提交后具体运行的优先级。
TaskSetManager 在实例化的时候要完成 TaskSchedulerImpl 的工作,因为它是 TaskSet 的管理者,所以它其中的一个成员肯定是 TaskSet,还有一个成员是每个任务最大的重试次数。TaskSetManager 会根据 locality aware 来为 Task 分配计算资源、监控 Task 的执行状态 (例如重试、慢任务进行推测式执行等,调度的时候底层有一个调度池)
TaskScheduler 与 SchedulerBackend 之间的关系他们两者之间的关系是一个是高层调度器、一个是底层调度器;一个负责 Stage 的划分、一个是负责把任务发送给 Executor 去执行并接收运行结果。
应用程序的资源分配在应用程序启动时已经完成,现在要考虑的是具体应用程序中每个任务到底要运行在那个 ExecutorBackend 上,现在是任务的分配。TaskScheduler 要负责为 Task 分配计算资源:此时程序已经分配好集群中的计算资源了,然后会根据计算本地性原则来确定 Task 具体要运行在那个 ExecutorBackend 中:
这里有两种不同的 Task,一种是 ShuffleMapTask,一种是 ResultMapTask
[下图是 DAGScheduler.scala 中 submitMissingTasks 方法中内部具体的实现]
DAGScheduler 完成面向 Stage 的划分之后,会按照顺序将每个 Stage 通过 TaskSchedulerImpl 的 Submit Task 提交给底层调度器 (提交作业啦!!!) TaskSchedulerImpl.submitTasks: 主要的作用是將 TaskSet 加入到 TaskSetManager 中進行管理;
[下图是 DAGScheduler.scala 中 submitMissingTasks 方法中内部具体的实现]
高层调度器 DAGScheduler 提交了任务是通过调用 submitTask 方法提交 TaskSet 给底层调度器,然后赋值给一个变量 Task,同时创建了一个 TaskSetManager 的实例,这个很关键,它传入了 taskSchedulerImpl 对象本身、TaskSet 和最大失败后自动重试的次数。
[下图是 TaskSchedulerImpl.scala 中 submitTasks 方法]
[下图是 TaskSchedulerImpl.scala 中 createTaskSetManager 方法]
创建 SparkContext 中调用了 createTaskScheduler 来创建 TaskSchedulerImpl 的实例,默认作业失败后自动重试的次数是 4 次。
[下图是 SparkContext.scala 中创建三大核心对象的代码实现]
[下图是 TaskSchedulerImpl.scala 中类和主构造器]
比较关键的地方是调用了 schedulableBuilder 中的 addTaskSetManager,SchedulableBuilder 本身是应用程序级别的调度器,它自己支持两种调度模式。SchedulableBuilder 会确定 TaskSetManager 的调度顺序,然后按照 TaskSetManager 的 locality aware 来确定每个 Task 具体运行在那个 ExecutorBackend 中;补充说明:schedulableBuilder 是在创建 TaskSchedulerImpl 时实例化的。
[下图是 SchedulableBuilder.scala 中的方法]
一种是FIFO; 另一种是FAIR,调度策略可以通过 spark-env.sh 中的 spark.scheduler.mode 进行具体的设置,默认情况下是 FIFO
[下图是 SparkContext.scala 中 createTaskScheduler 方法内部具体的实现]
[下图是 TaskScheduler.Impl 中 initialize 方法]
[下图是 TaskScheduler.Impl.scala 中 schedulingMode 变量的具体实现]