FairScheduler job初始化过程源码浅析(4)

12.SchedulingAlgorithms.computeFairShares:返回到该方法,全总体来看其实这里是一个算法(好吧,这个类本身就是一个算法类),这个算法旨在找出一个合适的fairShare值,使得所有job的权重*fairShare之和最接近cap值(Math.min(totalDemand, totalSlots)),这是一个二分查找算法,至于这样做的原因可以参见SchedulingAlgorithms.computeFairShares的注释,大致的意思好像是这个值叫做weighted fair sharing,We call R the weight-to-slots ratio because it converts a Schedulable's weight to the number of slots it is assigned。也就是这个可以根据这个值*pool的权重得到该pool所分配到的slot数量。就到这,英语不好看不太懂,等别人解释吧。最后sched.setFairShare(computeShare(sched, right))将这个值设置到PoolSchedulable中。

for (Schedulable sched: schedulables) {
      totalDemand += sched.getDemand();
    }
    double cap = Math.min(totalDemand, totalSlots);
    double rMax = 1.0;
    while (slotSUSEdWithWeightToSlotRatio(rMax, schedulables) < cap) {
      rMax *= 2.0;
    }
    // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
    double left = 0;
    double right = rMax;
    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
      double mid = (left + right) / 2.0;
      if (slotsUsedWithWeightToSlotRatio(mid, schedulables) < cap) {
        left = mid;
      } else {
        right = mid;
      }
    }

13.FairScheduler.update():后面是对每个JobSchedulable使用同上方法计算一个fairShare值,意义是为pool中的每个job的可分配的slot数量。这里同样会计算job的weigth,job的权重是由FairScheduler计算得到的,在计算权重时,可以选择是否开启根据job长度调整权重(由mapred.fairscheduler.sizebasedweight参数控制,默认false),然后根据job的优先级判断相应的权重,其对应关系:优先级:VERY_HIGH-权重:4.0/HIGH:2.0/NORMAL:1.0/LOW:0.5,最后根据weightAdjuster进行调整job的权重,需要手动实现,由mapred.fairscheduler.weightadjuster参数设置,如果你自定义了一个weightAdjuster类,则可以通过重写adjustWeight()方法控制job的权重。总之默认情况下一个job的权重只是取决于该Job优先级。后面的跳过,不是太懂

14.FairScheduler.update():最后是判断是否支持抢占机制,即当一个资源池资源有剩余是否允许将剩余资源共享给其他资源池。具体是判断每个资源池中正在运行的任务是否小于资源池本身最小资源量或者需求量,同时还判断该资源池是否急于将资源共享给其他资源,即资源使用量低于共享量的一半。

private void updatePreemptionVariables() {
    long now = clock.getTime();
    lastPreemptionUpdateTime = now;
    for (TaskType type: MAP_AND_REDUCE) {
      for (PoolSchedulable sched: getPoolSchedulables(type)) {
        if (!isStarvedForMinShare(sched)) {
          sched.setLastTimeAtMinShare(now);
        }
        if (!isStarvedForFairShare(sched)) {
          sched.setLastTimeAtHalfFairShare(now);
        }
        eventLog.log("PREEMPT_VARS", sched.getName(), type,
            now - sched.getLastTimeAtMinShare(),
            now - sched.getLastTimeAtHalfFairShare());
      }
    }
  }

到此,整个FairScheduler的任务初始化操作或者说JobListener的jobAdded()方法完成了,分析的有遗漏,也许还有错误,如您发现请不吝赐教,谢谢

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/b863d2b6f31e8e2afdef814c23e98022.html