Hadoop之MapReduce框架心跳机制分析(2)

下面描述一下上面流程图中的几个重要过程:

(1)过程一,判断是否达到心跳间隔。

TaskTracker的心跳间隔是由task完成情况以及整个集群规模规模动态觉得的。

Task完成情况完成对心跳的动态调

为了提高系统资源的利用效率和任务的相应的时间,MapReduce框架提供了一种基于已经运行完毕的任务数的机制用于动态地缩短TaskTracker的发送心跳间隔,从源码看,这种机制叫做“outOfBand”。当存在某个Task运行完成或者失败,TaskTracker会马上缩短心跳间隔以更快的速度将Task运行完成或者失败的消息告诉JobTracker,让其重新快速分配任务。具体的实现机制我们来看源码分析:

源码定位到TaskTracker中的offerService方法

State offerService() throws Exception {
    //上一次发心跳距现在时间
    long lastHeartbeat = System.currentTimeMillis();
    //此循环主要根据控制完成task个数控制心跳间隔。
    while (running && !shuttingDown) {
      try {
        long now = System.currentTimeMillis();//获得当前时间
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         
        // accelerate to account for multiple finished tasks up-front
      //通过完成的任务数动态控制心跳间隔时间 
      long remaining =
          (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
        while (remaining > 0) {
          // sleeps for the wait time or
          // until there are *enough* empty slots to schedule tasks
          synchronized (finishedCount) {
            finishedCount.wait(remaining);
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
            // Recompute
            now = System.currentTimeMillis();
            remaining =
              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             
            if (remaining <= 0) {
              // Reset count
              finishedCount.set(0);//将已经完成的Task个数计数器归零
    ....

上面代码的第九行,就是用来实现根据Task的运行完成或者失败数目来动态的缩短心跳间隔。其中finishedCount.get()用于获得获得已经运行完毕的Task的计数。再来看看这个计数是怎么incream的,定位到TaskTracker类的notifyTTAboutTaskCompletion方法:

/**
  * Notify the tasktracker to send an out-of-band heartbeat.
  */
  private void notifyTTAboutTaskCompletion() {
    if (oobHeartbeatOnTaskCompletion) {//判断是否启动“外带心跳”配置(默认为false)
      synchronized (finishedCount) {
        finishedCount.incrementAndGet();//运行完毕的Task计数器自增
        finishedCount.notify();
      }
    }
  }

其中oobHeartbeatOnTaskCompletion可以由mapreduce.tasktracker.outofband.heartbeat配置(默认为false),也就是说要当启动“外带心跳”时,才会启动根据Task完成或者失败数来动态调整心跳间隔机制。下面看看动态调整心跳的具体算法,进入getHeartbeatInterval(finishedCount.get())方法:

private long getHeartbeatInterval(int numFinishedTasks) {
    return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
  }

其中,numFinishedTasks代码已经运行完成或者失败的Task数目,oobHeartbeatDamper简称“心跳收缩因子”由mapreduce.tasktracker.outofband.heartbeat.damper配置(默认为1000000)。当    启动外带心跳机制时,如果某个时刻有numFinishedTasks个任务运行完成,则心跳间隔就会调整为(heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1))。当不启动“外带心跳”机制时numFinishedTasks默认就为0了,那么整个心跳间隔还是heartbeatInterval。
过程二,判断TaskTracker是否第一次启动

当到达心跳间隔后发送心跳前,会判断TaskTracker是否是第一次启动,如果是第一次启动的话则会检测当前的TaskTracker版本是否和JobTracker的版本是否一致,如果版本号一致才会向JobTracker发送心跳。看看源代码,还是TaskTracker类:

......
  if(justInited) {//第一次启动justInited默认为true
          String jobTrackerBV = jobClient.getBuildVersion();//获得JobTracker的版本号
          if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {//获得TaskTracker的版本号,并且判断JobTracker和TaskTracker的版本是否一致
            String msg = "Shutting down. Incompatible buildVersion." +
            "\nJobTracker's: " + jobTrackerBV +
            "\nTaskTracker's: "+ VersionInfo.getBuildVersion();
....
justInited = false;//TaskTracker启动后将其设置为false
......

justInited默认为true,当TaskTracker初次启动后会被改为false。当TaskTracker初次启动,进入检测TaskTracker和JobTracker版本一致性环节。跟进代码中看看是如何判断版本一致性的,
/**
  * Returns the buildVersion which includes version,
  * revision, user and date.
  */
  public static String getBuildVersion(){
    return VersionInfo.getVersion() +
    " from " + VersionInfo.getRevision() +
    " by " + VersionInfo.getUser() +
    " source checksum " + VersionInfo.getSrcChecksum();
  }

上面代码是获得JobTracker和TaskTracker版本号的返回格式字符串,getVersion()返回Hadoop版本号,getRevision()返回Hadoop的修订版本号,getUser()返回代码编译用户,getSrcChecksum()返回校验和。验证版本的一致性就是验证这些。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/176210ad19147bb1f2409d52c130c462.html