当launch一个job到Hadoop 集群,hadoop集群中的 TaskTracker(TT),在从一次heartbeat中接收到 JobTracker(JT)的cmd后,TT会在自己本地机器上运行一些task进程,以完成这个job的某些部分的数据 处理。在task运行过程中,或者 运行完成后,总是会或多或少的产生一些中间结果,或者jobCache的数据,这些数据通常都需要向TT的本地文件 系统 中写,写这些数据的目录是通过 hadoop的TT进程启动时在 hadoop-site.xml 中的 mapred.local.dir 选项来决定的。比如该配置选项如下:
<property>
<name>mapred.local.dir</name>
<value>/disk10/mapred/local,/disk11/mapred/local,/disk12/mapred/local,/disk13/mapred/local,/disk14/mapred/local</value>
</property>
那么其实这个TT的所有task,就有5个mapred的local目录写入点可供选择,通常每个 task运行之前,会从这些写入点中轮询的选择下一个,然后该task的中间结果等数据就往这个目录下写。写入的时候也不是直接就写到 这些写入点的根下去了,而是根据该task所属的jobid,taskid来创建相应的目录,并往这些目录中写。因为如果不这样,那么许多的task往写入点的根下写数据,并且文件名一样,就会乱了套了。在写入点中的对应的jobid和taskid创建的目录结构,TT的创建策略可以从其代码中看出来:
static String getLocalTaskDir(String jobid,
String taskid,
boolean isCleanupAttempt) {
String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
if (isCleanupAttempt) {
taskDir = taskDir + ".cleanup";
}
return taskDir;
}
static String getLocalJobDir(String jobid) {
return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
static String getJobCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
而
TaskTracker.SUBDIR = "taskTracker"
Path.SEPARATOR = “/”
TaskTracker.JOBCACHE = “jobcache”
所以,加入某个task得到的local写入点是 /disk10/mapred/local,那么它真正的local写入目录就是 : /disk10/mapred/local/taskTracker/jobcache/${jobid}/${taskid}/
这里要讲的是:当一个TT运行的时间非常的长,那么在其各个写入点下的临时文件或者垃圾文件就会越来越多,当这些数据越来越多,文件和目录越来越多时,TT和其上面运行的tasks就会受到影响,而致使job越来越慢。究其根源,可以在TT的源码中发现,其中有一个cleanup(boolean)的方法,这个方法用来对写入点下的临时文件和中间结果文件进行清理。但是问题在于,居然有两个方法在调用它,分别是:
startNewTask和 taskFinished(),也就是说,每当有一个task在TT上启动 ,或者每当有一个task运行结束,都会调用这个cleanup方法,而当TT运行时间太长时,这个cleanup的操作就会越来越慢,这就是导致通常的 hadoop集群总是会越跑越慢的原因之一。
找到了问题的根源,解决 起来就比较好办了,在startNewTask()和 taskFinished() 方法的时候,不调用cleanup() 方法,以节省task启动的开销,并在TT中启动一个专门用来清理这些数据目录的线程,在可配置的时间内,如果某些目录和文件的创建和修改时间超过了某个阈值,就删除这些目录和文件,问题就可以解决。
更多Hadoop相关信息见Hadoop 专题页面 ?tid=13