Hadoop之MapReduce框架心跳机制分析

MapReduce框架中的master/slave心跳机制是整个集群运作的基础,是沟通TaskTracker和JobTracker的桥梁。TaskTracker周期性地调用心跳RPC函数,汇报节点和任务运行状态信息。MapReduce框架中通过心跳机制可以实现给TaskTracker分配任务、使JobTracker能够及时获取各个节点的资源使用情况和任务运行状态信息、判断TaskTracker的死活。本文主要从JobTracker和TaskTracker通信双方的角度分别去分析他们之间的心跳通信机制。

2、TaskTracker端心跳机制

JobTracker和TaskTracker之前的心跳模式采取了“拉”的方式,JobTracker不会主动向各个TaskTracker发送心跳信息,而是各个TaskTracker主动向JobTracker发送信息,同时领取JobTracker返回心跳包的各种命令。

TaskTracker中有一个run方法,其维护了一个无限循环用于通过心跳发送任务运行状态信息和接收JobTracker通过心跳返回的命令信息。其代码结构大概如下:

/**
  * The server retry loop.
  * This while-loop attempts to connect to the JobTracker.  It only
  * loops when the old TaskTracker has gone bad (its state is
  * stale somehow) and we need to reinitialize everything.
  */
  public void run() {
    try {
      getUserLogManager().start();
      startCleanupThreads();
      boolean denied = false;
      while (running && !shuttingDown && !denied) {
        boolean staleState = false;
        try {
          // This while-loop attempts reconnects if we get network errors
          while (running && !staleState && !shuttingDown && !denied) {
            try {
              State osState = offerService();
              if (osState == State.STALE) {
                staleState = true;
              } else if (osState == State.DENIED) {
                denied = true;
              }
....}

其中,用于处理心跳相关信息的服务函数offerService代码大体框架:

/**
  * Main service loop.  Will stay in this loop forever.
  */
  State offerService() throws Exception {
    long lastHeartbeat = System.currentTimeMillis();//上一次发心跳距现在时间
  ////此循环主要根据控制完成task个数控制心跳间隔。
    while (running && !shuttingDown) {
      try {
        long now = System.currentTimeMillis();//获得当前时间                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
        // accelerate to account for multiple finished tasks up-front
        //通过完成的任务数动态控制心跳间隔时间 
      long remaining =
          (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;
        while (remaining > 0) {
          // sleeps for the wait time or
          // until there are *enough* empty slots to schedule tasks
          synchronized (finishedCount) {
            finishedCount.wait(remaining);                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
            // Recompute
            now = System.currentTimeMillis();
            remaining =
              (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
            if (remaining <= 0) {
              // Reset count
              finishedCount.set(0);
              break;
            }
          }
        }
...
//发送心跳
// Send the heartbeat and process the jobtracker's directives
        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);//真正想JobTracker发送心跳
.....
//开始处理JobTracker返回的命令
TaskTrackerAction[] actions = heartbeatResponse.getActions();
...
//杀死一定时间没没有汇报进度的task
markUnresponsiveTasks();
//当剩余磁盘空间小于mapred.local.dir.minspacekill(默认为0)时,寻找合适的任务将其杀掉以释放空间
killOverflowingTasks();

从整个源码看,TaskTracker向JobTracker发送一次心跳的流程如下:

Hadoop之MapReduce框架心跳机制分析

相关阅读

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/176210ad19147bb1f2409d52c130c462.html