Hadoop学习总结:Map(3)

TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:

State offerService() throws Exception {

long lastHeartbeat = 0;

  //TaskTracker进行是一直存在的

while (running && !shuttingDown) {

……

long now = System.currentTimeMillis();

      //每隔一段时间就向JobTracker发送heartbeat

long waitTime = heartbeatInterval - (now - lastHeartbeat);

if (waitTime > 0) {

synchronized(finishedCount) {

if (finishedCount[0] == 0) {

finishedCount.wait(waitTime);

}

finishedCount[0] = 0;

}

}

……

      //发送Heartbeat到JobTracker,得到response

HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

……

     //从Response中得到此TaskTracker需要做的事情

TaskTrackerAction[] actions = heartbeatResponse.getActions();

……

if (actions != null){

for(TaskTrackerAction action: actions) {

if (action instanceof LaunchTaskAction) {

            //如果是运行一个新的Task,则将Action添加到任务队列中

addToTaskQueue((LaunchTaskAction)action);

} else if (action instanceof CommitTaskAction) {

CommitTaskAction commitAction = (CommitTaskAction)action;

if (!commitResponses.contains(commitAction.getTaskID())) {

commitResponses.add(commitAction.getTaskID());

}

} else {

tasksToCleanup.put(action);

}

}

}

}

return State.NORMAL;

}

 

其中transmitHeartBeat主要逻辑如下:

private HeartbeatResponse transmitHeartBeat(long now) throws IOException {

  //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

boolean sendCounters;

if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

sendCounters = true;

previousUpdate = now;

}

else {

sendCounters = false;

}

……

  //报告给JobTracker,此TaskTracker的当前状态

if (status == null) {

synchronized (this) {

status = new TaskTrackerStatus(taskTrackerName, localHostname,

httpPort,

cloneAndResetRunningTaskStatuses(

sendCounters),

failures,

maxCurrentMapTasks,

maxCurrentReduceTasks);

}

}

……

  //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

  //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

  //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

boolean askForNewTask;

long localMinSpaceStart;

synchronized (this) {

askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||

status.countReduceTasks() < maxCurrentReduceTasks) &&

acceptNewTasks;

localMinSpaceStart = minSpaceStart;

}

……

  //向JobTracker发送heartbeat,这是一个RPC调用

HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

justStarted, askForNewTask,

heartbeatResponseId);

……

return heartbeatResponse;

}

 

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/ppfzg.html