下面描述一下上面流程图中的几个重要过程:
(1)过程一,判断是否达到心跳间隔。
TaskTracker的心跳间隔是由task完成情况以及整个集群规模规模动态觉得的。
Task完成情况完成对心跳的动态调
为了提高系统资源的利用效率和任务的相应的时间,MapReduce框架提供了一种基于已经运行完毕的任务数的机制用于动态地缩短TaskTracker的发送心跳间隔,从源码看,这种机制叫做“outOfBand”。当存在某个Task运行完成或者失败,TaskTracker会马上缩短心跳间隔以更快的速度将Task运行完成或者失败的消息告诉JobTracker,让其重新快速分配任务。具体的实现机制我们来看源码分析:
源码定位到TaskTracker中的offerService方法
State offerService() throws Exception {
//上一次发心跳距现在时间
long lastHeartbeat = System.currentTimeMillis();
//此循环主要根据控制完成task个数控制心跳间隔。
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();//获得当前时间
// accelerate to account for multiple finished tasks up-front
//通过完成的任务数动态控制心跳间隔时间
long remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
while (remaining > 0) {
// sleeps for the wait time or
// until there are *enough* empty slots to schedule tasks
synchronized (finishedCount) {
finishedCount.wait(remaining);
// Recompute
now = System.currentTimeMillis();
remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
if (remaining <= 0) {
// Reset count
finishedCount.set(0);//将已经完成的Task个数计数器归零
....
上面代码的第九行,就是用来实现根据Task的运行完成或者失败数目来动态的缩短心跳间隔。其中finishedCount.get()用于获得获得已经运行完毕的Task的计数。再来看看这个计数是怎么incream的,定位到TaskTracker类的notifyTTAboutTaskCompletion方法:
/**
* Notify the tasktracker to send an out-of-band heartbeat.
*/
private void notifyTTAboutTaskCompletion() {
if (oobHeartbeatOnTaskCompletion) {//判断是否启动“外带心跳”配置(默认为false)
synchronized (finishedCount) {
finishedCount.incrementAndGet();//运行完毕的Task计数器自增
finishedCount.notify();
}
}
}
其中oobHeartbeatOnTaskCompletion可以由mapreduce.tasktracker.outofband.heartbeat配置(默认为false),也就是说要当启动“外带心跳”时,才会启动根据Task完成或者失败数来动态调整心跳间隔机制。下面看看动态调整心跳的具体算法,进入getHeartbeatInterval(finishedCount.get())方法:
private long getHeartbeatInterval(int numFinishedTasks) {
return (heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1));
}
其中,numFinishedTasks代码已经运行完成或者失败的Task数目,oobHeartbeatDamper简称“心跳收缩因子”由mapreduce.tasktracker.outofband.heartbeat.damper配置(默认为1000000)。当 启动外带心跳机制时,如果某个时刻有numFinishedTasks个任务运行完成,则心跳间隔就会调整为(heartbeatInterval / (numFinishedTasks * oobHeartbeatDamper + 1))。当不启动“外带心跳”机制时numFinishedTasks默认就为0了,那么整个心跳间隔还是heartbeatInterval。
过程二,判断TaskTracker是否第一次启动
当到达心跳间隔后发送心跳前,会判断TaskTracker是否是第一次启动,如果是第一次启动的话则会检测当前的TaskTracker版本是否和JobTracker的版本是否一致,如果版本号一致才会向JobTracker发送心跳。看看源代码,还是TaskTracker类:
......
if(justInited) {//第一次启动justInited默认为true
String jobTrackerBV = jobClient.getBuildVersion();//获得JobTracker的版本号
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {//获得TaskTracker的版本号,并且判断JobTracker和TaskTracker的版本是否一致
String msg = "Shutting down. Incompatible buildVersion." +
"\nJobTracker's: " + jobTrackerBV +
"\nTaskTracker's: "+ VersionInfo.getBuildVersion();
....
justInited = false;//TaskTracker启动后将其设置为false
......
justInited默认为true,当TaskTracker初次启动后会被改为false。当TaskTracker初次启动,进入检测TaskTracker和JobTracker版本一致性环节。跟进代码中看看是如何判断版本一致性的,
/**
* Returns the buildVersion which includes version,
* revision, user and date.
*/
public static String getBuildVersion(){
return VersionInfo.getVersion() +
" from " + VersionInfo.getRevision() +
" by " + VersionInfo.getUser() +
" source checksum " + VersionInfo.getSrcChecksum();
}
上面代码是获得JobTracker和TaskTracker版本号的返回格式字符串,getVersion()返回Hadoop版本号,getRevision()返回Hadoop的修订版本号,getUser()返回代码编译用户,getSrcChecksum()返回校验和。验证版本的一致性就是验证这些。