Hadoop之MapReduce框架心跳机制分析(3)

过程三,检测磁盘是否读写是否正常。

MapReduce框架中,在map任务计算过程中会将输出结果保存在mapred.local.dir指定的本地目录中(可以由多块磁盘组成,配置的时候用逗号隔开),这些本地目录是没有备份的(不像HDFS上有副本)一旦丢失或者损害整个Map任务需要重新进行计算。TaskTracker初始化时会对这些目录进行一次检测,并将正常的目录保存起来。之后,TaskTracker会周期性(由mapred.disk.healthChecker.interval配置,默认60s)地对这些正常目录进行检测,如果发现故障目录,TaskTracker就会重新对自己进行初始化。看看源代码,定位到TaskTracker的offerService方法:

......
        now = System.currentTimeMillis();
        if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {//判断是否达到检测磁盘的时间间隔
          localStorage.checkDirs();//检测硬盘读写是否正常
          lastCheckDirsTime = now;
          int numFailures = localStorage.numFailures();//出现读写错误的目录数
          // Re-init the task tracker if there were any new failures
          if (numFailures > lastNumFailures) {//检测本次检测中是否存在损害目录
            lastNumFailures = numFailures;
            return State.STALE;//硬盘读写检测错误,返回需要从新初始化状态
          }
        }
......

其中,diskHealthCheckInterval代表检测磁盘的时间间隔,由mapred.disk.healthChecker.interval配置,默认60s。

过程四,发送心跳。


TaskTracker将当前节点运行时信息,例如TaskTracker基本情况、资源使用情况、任务运行状态等,通过心跳信息向JobTracker进行汇报,同时接受来自JobTracker的各种指令。

// Send the heartbeat and process the jobtracker's directives
      HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//真正向JobTracker发送心跳

TaskTracker基本情况、资源使用情况、任务运行状态等信息会被封装到一个可序列化的类TaskTrackerStatus中,并会伴随心跳发送给JobTracker。每次发送心跳时,TaskTracker根据最新的信息重新构造TaskTrackerStatus。但是从源代码看并不是每次心跳都会发送节点资源信息申请新任务,看代码:

// Check if we should ask for a new Task
  //
  boolean askForNewTask;
  long localMinSpaceStart;
  //存在空闲的map或者reduce slot,并且map输出目录大于mapred.local.dir.minspackekill才去向JobTracker发送节点资源使用情况申请新任务。
  synchronized (this) {
    askForNewTask =
      ((status.countOccupiedMapSlots() < maxMapSlots ||
        status.countOccupiedReduceSlots() < maxReduceSlots) &&
      acceptNewTasks);
    localMinSpaceStart = minSpaceStart;
  }
  if (askForNewTask) {
    askForNewTask = enoughFreeSpace(localMinSpaceStart);//判断map中间结果输出路径空间
    long freeDiskSpace = getFreeSpace();//获得map中间结果输出大小
    long totVmem = getTotalVirtualMemoryOnTT();//获得虚拟内存总量
    long totPmem = getTotalPhysicalMemoryOnTT();//获得物理内存总量
    long availableVmem = getAvailableVirtualMemoryOnTT();//获得可用虚拟内存量
    long availablePmem = getAvailablePhysicalMemoryOnTT();//获得可用物理内存量
    long cumuCpuTime = getCumulativeCpuTimeOnTT();//获得TaskTracker自从集群启动到现在的累计使用时间
    long cpuFreq = getCpuFrequencyOnTT();//获得cpu频率
    int numCpu = getNumProcessorsOnTT();//获得cpu核心数
    float cpuUsage = getCpuUsageOnTT();//获得cpu使用率
  //将这些资源信息封装到TaskTracker中的resStatus对象(ResourceStatus类实例)
    status.getResourceStatus().setAvailableSpace(freeDiskSpace);
    status.getResourceStatus().setTotalVirtualMemory(totVmem);
    status.getResourceStatus().setTotalPhysicalMemory(totPmem);
    status.getResourceStatus().setMapSlotMemorySizeOnTT(
        mapSlotMemorySizeOnTT);
    status.getResourceStatus().setReduceSlotMemorySizeOnTT(
        reduceSlotSizeMemoryOnTT);
    status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
    status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
    status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
    status.getResourceStatus().setCpuFrequency(cpuFreq);
    status.getResourceStatus().setNumProcessors(numCpu);
    status.getResourceStatus().setCpuUsage(cpuUsage);
  }

只有当存在空闲的map或者reduce slot,并且map输出目录大于mapred.local.dir.minspackekill才会将上面的节点资源信息放到TaskTrackerStatus中,向JobTracker发送节点资源使用情况申请新任务。再来看看通过心跳传给JobTracker的TaskTrackerStatus封装的具信息:

String trackerName;//taskTracker名称
  String host;//主机名
  int httpPort;//TaskTracker对外的http端口
  int failures;//TaskTracker上运行失败的任务总数
  List<TaskStatus> taskReports;//记录了当前TaskTracker上各个任务的运行状态
                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
  volatile long lastSeen;//上次汇报心跳的时间
  private int maxMapTasks;//map slot总数
  private int maxReduceTasks;//reduce slot总数
  private TaskTrackerHealthStatus healthStatus;//记录TaskTracker的健康情况
...
  private ResourceStatus resStatus;////TaskTracker资源信息,包括cpu、虚拟内存、物理内存等信息

以下主要说说healthStatus变量,以及针对这个数据结构的节点健康监测机制。(注意:该部分来自参考文献[1],p176~p177)
 healthStatus保存了节点的健康情况,该变量对应TaskTrackerHealthStatus类,结构如下:

static class TaskTrackerHealthStatus implements Writable {
    private boolean isNodeHealthy;//节点是否健康
    private String healthReport;//如果节点不健康,则记录导致不健康的原因
    private long lastReported;//上一次汇报健康状态的时间
...}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/176210ad19147bb1f2409d52c130c462.html