MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁。TaskTracker周期性地调用心跳RPC函数,汇报节点和任务运行状态信息。MapReduce框架中通过心跳机制可以实现给TaskTracker分配任务、使JobTracker能够及时获取各个节点的资源使用情况和任务运行状态信息、判断TaskTracker的死活。本文主要从JobTracker和TaskTracker通信双方的角度分别去分析他们之间的心跳通信机制。
2、TaskTracker端心跳机制
JobTracker和TaskTracker之前的心跳模式采取了“拉”的方式,JobTracker不会主动向各个TaskTracker发送心跳信息,而是各个TaskTracker主动向JobTracker发送信息,同时领取JobTracker返回心跳包的各种命令。
TaskTracker中有一个run方法,其维护了一个无限循环用于通过心跳发送任务运行状态信息和接收JobTracker通过心跳返回的命令信息。其代码结构大概如下:
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
* loops when the old TaskTracker has gone bad (its state is
* stale somehow) and we need to reinitialize everything.
*/
public void run() {
try {
getUserLogManager().start();
startCleanupThreads();
boolean denied = false;
while (running && !shuttingDown && !denied) {
boolean staleState = false;
try {
// This while-loop attempts reconnects if we get network errors
while (running && !staleState && !shuttingDown && !denied) {
try {
State osState = offerService();
if (osState == State.STALE) {
staleState = true;
} else if (osState == State.DENIED) {
denied = true;
}
....}
其中,用于处理心跳相关信息的服务函数offerService代码大体框架:
/**
* Main service loop. Will stay in this loop forever.
*/
State offerService() throws Exception {
long lastHeartbeat = System.currentTimeMillis();//上一次发心跳距现在时间
////此循环主要根据控制完成task个数控制心跳间隔。
while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();//获得当前时间
// accelerate to account for multiple finished tasks up-front
//通过完成的任务数动态控制心跳间隔时间
long remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
while (remaining > 0) {
// sleeps for the wait time or
// until there are *enough* empty slots to schedule tasks
synchronized (finishedCount) {
finishedCount.wait(remaining);
// Recompute
now = System.currentTimeMillis();
remaining =
(lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
if (remaining <= 0) {
// Reset count
finishedCount.set(0);
break;
}
}
}
...
//发送心跳
// Send the heartbeat and process the jobtracker's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//真正想JobTracker发送心跳
.....
//开始处理JobTracker返回的命令
TaskTrackerAction[] actions = heartbeatResponse.getActions();
...
//杀死一定时间没没有汇报进度的task
markUnresponsiveTasks();
//当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间
killOverflowingTasks();
从整个源码看,TaskTracker向JobTracker发送一次心跳的流程如下:
相关阅读:
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置
搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)