从第3步 submitTask 方法中最后调用了 backend.revivOffers 方法。这是 CoarseGrainedSchedulerBackend.reviveOffers: 给 DrivereEndpoint 发送 ReviveOffers,DriverEndPoint 是驱动程序的调度器;
[下图是 CoarseGrainedSchedulerBackend.scala 中 reviveOffers 方法]
[下图是 CoarseGrainedSchedulerBackend.scala 中 DriverEndPoint 类里的 start 方法]
[下图是 CoarseGrainedSchedulerBackend.scala 中 DriverEndPoint 类]
[下图是 CoarseGrainedSchedulerBackend.scala 中 DriverEndPoint 类里的 receive 方法内部具体的实现]
ReviveOffers 本身是一个空的 case object 对象,只是起到触发底层资源触发调度的作用,在有 Task 提交或者计算资源变动的时候会发送 ReviveOffers 这个消息作为触发器;
[下图是 CoarseGrainedClusterMessage.scala 中 ReviveOffers case object]
在 DriverEndpoint 接受 ReviveOffers 消息并路由到 makeOffers 具体的方法中;在 makeOffers 方法中首先准备好所有可以用于计算的 Executor,然后找出可以的 workOffers (代表了所有可用 ExecutorBackend 中可以使用的 CPU Cores 信息)WorkerOffer 会告我们具体 Executor 可用的资源,比如说 CPU Core,为什么此时不考虑内存只考虑 CPU Core,因为在这之前已经分配好了。
[下图是 CoarseGrainedSchedulerBackend.scala 中 makeOffers 方法]
[下图是 WorkerOffer.scala 中 WorkerOffer case class]
而确定 Task 具体运行在哪个 ExecutorBackend 上的算法是由 TaskSetManager 的 resourceOffer 的方法決定。TaskScheduerImpl.resourceOffers: 为每一个Task 具体分配计算资源,输入是 ExcutorBackend 及其上可用的 Cores,输出 TaskDescription 的二位数组,在其中确定了每个 Task 具体运行在哪个 ExecutorBackend: resourceOffers 到底是如何确定 Task 具体运行在那个 ExecutorBackend 上的呢?算法的实现具体如下:
[下图是 TaskSchedulerImpl.scala 中 resourceOffers 方法]
[下图是 TaskSchedulerImpl.scala 中 resourceOffers 方法内部具体的实现]
通过 Random.shuffle 方法重新洗牌所有的计算以寻找以计算的负载均衡;
根据每个 ExecutorBackend 的 cores 的个数声明类行为 TaskDescription 的 ArrayBuffer 数组
打散的是 Executor 的资源,这样有随机性,随机性有利于负载均衡;