TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:
State offerService() throws Exception {
long lastHeartbeat = 0;
//TaskTracker进行是一直存在的
while (running && !shuttingDown) {
……
long now = System.currentTimeMillis();
//每隔一段时间就向JobTracker发送heartbeat
long waitTime = heartbeatInterval - (now - lastHeartbeat);
if (waitTime > 0) {
synchronized(finishedCount) {
if (finishedCount[0] == 0) {
finishedCount.wait(waitTime);
}
finishedCount[0] = 0;
}
}
……
//发送Heartbeat到JobTracker,得到response
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
……
//从Response中得到此TaskTracker需要做的事情
TaskTrackerAction[] actions = heartbeatResponse.getActions();
……
if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
//如果是运行一个新的Task,则将Action添加到任务队列中
addToTaskQueue((LaunchTaskAction)action);
} else if (action instanceof CommitTaskAction) {
CommitTaskAction commitAction = (CommitTaskAction)action;
if (!commitResponses.contains(commitAction.getTaskID())) {
commitResponses.add(commitAction.getTaskID());
}
} else {
tasksToCleanup.put(action);
}
}
}
}
return State.NORMAL;
}
其中transmitHeartBeat主要逻辑如下:
private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
//每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}
……
//报告给JobTracker,此TaskTracker的当前状态
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxCurrentMapTasks,
maxCurrentReduceTasks);
}
}
……
//当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:
//当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数
//当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
status.countReduceTasks() < maxCurrentReduceTasks) &&
acceptNewTasks;
localMinSpaceStart = minSpaceStart;
}
……
//向JobTracker发送heartbeat,这是一个RPC调用
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);
……
return heartbeatResponse;
}