Hadoop1.2.1源码解析系列:JT与TT之间的心跳通信机制(2)

该方法中涉及到runningTasks队列,该队列保存了该TT上接收的所有未完成的Task任务,通过runningTasks.values()可以获取TT当前所有未完成的Task,然后获取每个TaskInProgress的status信息,同时根据第一步判断出的sendCounters(true/false)决定是否发送counters信息(includeCounters),即是否将counters对象序列化到TaskStatus对象中,这里需要注意如果TaskInProgress不处于Running状态,则includeCounters设为true,即发送counters信息。

--------------------------------------分割线 --------------------------------------

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

Hadoop LZO 安装教程

Hadoop集群上使用Lzo压缩

--------------------------------------分割线 --------------------------------------

3.TaskTrackerStatus():

public TaskTrackerStatus(String trackerName, String host,
                          int httpPort, List<TaskStatus> taskReports,
                          int taskFailures, int dirFailures,
                          int maxMapTasks, int maxReduceTasks) {
    this.trackerName = trackerName;
    this.host = host;
    this.httpPort = httpPort;

this.taskReports = new ArrayList<TaskStatus>(taskReports);
    this.taskFailures = taskFailures;
    this.dirFailures = dirFailures;
    this.maxMapTasks = maxMapTasks;
    this.maxReduceTasks = maxReduceTasks;
    this.resStatus = new ResourceStatus();
    this.healthStatus = new TaskTrackerHealthStatus();
  }

这里只是进行简单的变量复制操作,分析下其中一些参数的含义:

1)taskReports:包含该TT上目前所有的Task状态信息,其中的counters信息会根据之前判断sendCounters值进行决定是否发送,上一步有提到。

2)taskFailures:该TT上失败的Task总数(重启会清空),该参数帮助JT决定是否向该TT提交Task,因为失败数越多表明该TT可能出现Task失败的概率越大。

3)dirFailures:这个值是mapred.local.dir参数设置的目录中有多少是不可用的(以后会详细提到)

4)maxMapSlots/maxReduceSlots:这个值是TT可使用的最大map和reduce slot数量

初始化完成,继续回到TaskTracker.transmitHeartBeat方法。

4.TaskTracker.transmitHeartBeat:

// Check if we should ask for a new Task
    //
    boolean askForNewTask;
    long localMinSpaceStart;
    synchronized (this) {
      askForNewTask =
        ((status.countOccupiedMapSlots() < maxMapSlots ||
          status.countOccupiedReduceSlots() < maxReduceSlots) &&
        acceptNewTasks);
      localMinSpaceStart = minSpaceStart;
    }
    if (askForNewTask) {
      askForNewTask = enoughFreeSpace(localMinSpaceStart);
      long freeDiskSpace = getFreeSpace();
      long totVmem = getTotalVirtualMemoryOnTT();
      long totPmem = getTotalPhysicalMemoryOnTT();
      long availableVmem = getAvailableVirtualMemoryOnTT();
      long availablePmem = getAvailablePhysicalMemoryOnTT();
      long cumuCpuTime = getCumulativeCpuTimeOnTT();
      long cpuFreq = getCpuFrequencyOnTT();
      int numCpu = getNumProcessorsOnTT();
      float cpuUsage = getCpuUsageOnTT();

status.getResourceStatus().setAvailableSpace(freeDiskSpace);
      status.getResourceStatus().setTotalVirtualMemory(totVmem);
      status.getResourceStatus().setTotalPhysicalMemory(totPmem);
      status.getResourceStatus().setMapSlotMemorySizeOnTT(
          mapSlotMemorySizeOnTT);
      status.getResourceStatus().setReduceSlotMemorySizeOnTT(
          reduceSlotSizeMemoryOnTT);
      status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
      status.getResourceStatus().setCpuFrequency(cpuFreq);
      status.getResourceStatus().setNumProcessors(numCpu);
      status.getResourceStatus().setCpuUsage(cpuUsage);
    }

从源码中的注释可以知道,此处是TT根据自身资源使用情况判断是否接收new task。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/62516c0392a42080fbd76bd24d9549b5.html