过程三,检测磁盘是否读写是否正常。
MapReduce框架中,在map任务计算过程中会将输出结果保存在mapred.local.dir指定的本地目录中(可以由多块磁盘组成,配置的时候用逗号隔开),这些本地目录是没有备份的(不像HDFS上有副本)一旦丢失或者损害整个Map任务需要重新进行计算。TaskTracker初始化时会对这些目录进行一次检测,并将正常的目录保存起来。之后,TaskTracker会周期性(由mapred.disk.healthChecker.interval配置,默认60s)地对这些正常目录进行检测,如果发现故障目录,TaskTracker就会重新对自己进行初始化。看看源代码,定位到TaskTracker的offerService方法:
......
now = System.currentTimeMillis();
if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {//判断是否达到检测磁盘的时间间隔
localStorage.checkDirs();//检测硬盘读写是否正常
lastCheckDirsTime = now;
int numFailures = localStorage.numFailures();//出现读写错误的目录数
// Re-init the task tracker if there were any new failures
if (numFailures > lastNumFailures) {//检测本次检测中是否存在损害目录
lastNumFailures = numFailures;
return State.STALE;//硬盘读写检测错误,返回需要从新初始化状态
}
}
......
其中,diskHealthCheckInterval代表检测磁盘的时间间隔,由mapred.disk.healthChecker.interval配置,默认60s。
过程四,发送心跳。
TaskTracker将当前节点运行时信息,例如TaskTracker基本情况、资源使用情况、任务运行状态等,通过心跳信息向JobTracker进行汇报,同时接受来自JobTracker的各种指令。
// Send the heartbeat and process the jobtracker's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//真正向JobTracker发送心跳
TaskTracker基本情况、资源使用情况、任务运行状态等信息会被封装到一个可序列化的类TaskTrackerStatus中,并会伴随心跳发送给JobTracker。每次发送心跳时,TaskTracker根据最新的信息重新构造TaskTrackerStatus。但是从源代码看并不是每次心跳都会发送节点资源信息申请新任务,看代码:
// Check if we should ask for a new Task
//
boolean askForNewTask;
long localMinSpaceStart;
//存在空闲的map或者reduce slot,并且map输出目录大于mapred.local.dir.minspackekill才去向JobTracker发送节点资源使用情况申请新任务。
synchronized (this) {
askForNewTask =
((status.countOccupiedMapSlots() < maxMapSlots ||
status.countOccupiedReduceSlots() < maxReduceSlots) &&
acceptNewTasks);
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
askForNewTask = enoughFreeSpace(localMinSpaceStart);//判断map中间结果输出路径空间
long freeDiskSpace = getFreeSpace();//获得map中间结果输出大小
long totVmem = getTotalVirtualMemoryOnTT();//获得虚拟内存总量
long totPmem = getTotalPhysicalMemoryOnTT();//获得物理内存总量
long availableVmem = getAvailableVirtualMemoryOnTT();//获得可用虚拟内存量
long availablePmem = getAvailablePhysicalMemoryOnTT();//获得可用物理内存量
long cumuCpuTime = getCumulativeCpuTimeOnTT();//获得TaskTracker自从集群启动到现在的累计使用时间
long cpuFreq = getCpuFrequencyOnTT();//获得cpu频率
int numCpu = getNumProcessorsOnTT();//获得cpu核心数
float cpuUsage = getCpuUsageOnTT();//获得cpu使用率
//将这些资源信息封装到TaskTracker中的resStatus对象(ResourceStatus类实例)
status.getResourceStatus().setAvailableSpace(freeDiskSpace);
status.getResourceStatus().setTotalVirtualMemory(totVmem);
status.getResourceStatus().setTotalPhysicalMemory(totPmem);
status.getResourceStatus().setMapSlotMemorySizeOnTT(
mapSlotMemorySizeOnTT);
status.getResourceStatus().setReduceSlotMemorySizeOnTT(
reduceSlotSizeMemoryOnTT);
status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
status.getResourceStatus().setCpuFrequency(cpuFreq);
status.getResourceStatus().setNumProcessors(numCpu);
status.getResourceStatus().setCpuUsage(cpuUsage);
}
只有当存在空闲的map或者reduce slot,并且map输出目录大于mapred.local.dir.minspackekill才会将上面的节点资源信息放到TaskTrackerStatus中,向JobTracker发送节点资源使用情况申请新任务。再来看看通过心跳传给JobTracker的TaskTrackerStatus封装的具信息:
String trackerName;//taskTracker名称
String host;//主机名
int httpPort;//TaskTracker对外的http端口
int failures;//TaskTracker上运行失败的任务总数
List<TaskStatus> taskReports;//记录了当前TaskTracker上各个任务的运行状态
volatile long lastSeen;//上次汇报心跳的时间
private int maxMapTasks;//map slot总数
private int maxReduceTasks;//reduce slot总数
private TaskTrackerHealthStatus healthStatus;//记录TaskTracker的健康情况
...
private ResourceStatus resStatus;////TaskTracker资源信息,包括cpu、虚拟内存、物理内存等信息
以下主要说说healthStatus变量,以及针对这个数据结构的节点健康监测机制。(注意:该部分来自参考文献[1],p176~p177)
healthStatus保存了节点的健康情况,该变量对应TaskTrackerHealthStatus类,结构如下:
static class TaskTrackerHealthStatus implements Writable {
private boolean isNodeHealthy;//节点是否健康
private String healthReport;//如果节点不健康,则记录导致不健康的原因
private long lastReported;//上一次汇报健康状态的时间
...}