该方法中涉及到runningTasks队列,该队列保存了该TT上接收的所有未完成的Task任务,通过runningTasks.values()可以获取TT当前所有未完成的Task,然后获取每个TaskInProgress的status信息,同时根据第一步判断出的sendCounters(true/false)决定是否发送counters信息(includeCounters),即是否将counters对象序列化到TaskStatus对象中,这里需要注意如果TaskInProgress不处于Running状态,则includeCounters设为true,即发送counters信息。
--------------------------------------分割线 --------------------------------------
Ubuntu 13.04上搭建Hadoop环境
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置
--------------------------------------分割线 --------------------------------------
3.TaskTrackerStatus():
public TaskTrackerStatus(String trackerName, String host,
int httpPort, List<TaskStatus> taskReports,
int taskFailures, int dirFailures,
int maxMapTasks, int maxReduceTasks) {
this.trackerName = trackerName;
this.host = host;
this.httpPort = httpPort;
this.taskReports = new ArrayList<TaskStatus>(taskReports);
this.taskFailures = taskFailures;
this.dirFailures = dirFailures;
this.maxMapTasks = maxMapTasks;
this.maxReduceTasks = maxReduceTasks;
this.resStatus = new ResourceStatus();
this.healthStatus = new TaskTrackerHealthStatus();
}
这里只是进行简单的变量复制操作,分析下其中一些参数的含义:
1)taskReports:包含该TT上目前所有的Task状态信息,其中的counters信息会根据之前判断sendCounters值进行决定是否发送,上一步有提到。
2)taskFailures:该TT上失败的Task总数(重启会清空),该参数帮助JT决定是否向该TT提交Task,因为失败数越多表明该TT可能出现Task失败的概率越大。
3)dirFailures:这个值是mapred.local.dir参数设置的目录中有多少是不可用的(以后会详细提到)
4)maxMapSlots/maxReduceSlots:这个值是TT可使用的最大map和reduce slot数量
初始化完成,继续回到TaskTracker.transmitHeartBeat方法。
4.TaskTracker.transmitHeartBeat:
// Check if we should ask for a new Task
//
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask =
((status.countOccupiedMapSlots() < maxMapSlots ||
status.countOccupiedReduceSlots() < maxReduceSlots) &&
acceptNewTasks);
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
askForNewTask = enoughFreeSpace(localMinSpaceStart);
long freeDiskSpace = getFreeSpace();
long totVmem = getTotalVirtualMemoryOnTT();
long totPmem = getTotalPhysicalMemoryOnTT();
long availableVmem = getAvailableVirtualMemoryOnTT();
long availablePmem = getAvailablePhysicalMemoryOnTT();
long cumuCpuTime = getCumulativeCpuTimeOnTT();
long cpuFreq = getCpuFrequencyOnTT();
int numCpu = getNumProcessorsOnTT();
float cpuUsage = getCpuUsageOnTT();
status.getResourceStatus().setAvailableSpace(freeDiskSpace);
status.getResourceStatus().setTotalVirtualMemory(totVmem);
status.getResourceStatus().setTotalPhysicalMemory(totPmem);
status.getResourceStatus().setMapSlotMemorySizeOnTT(
mapSlotMemorySizeOnTT);
status.getResourceStatus().setReduceSlotMemorySizeOnTT(
reduceSlotSizeMemoryOnTT);
status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
status.getResourceStatus().setCpuFrequency(cpuFreq);
status.getResourceStatus().setNumProcessors(numCpu);
status.getResourceStatus().setCpuUsage(cpuUsage);
}
从源码中的注释可以知道,此处是TT根据自身资源使用情况判断是否接收new task。