深度分析如何在Hadoop中控制Map的数量

很多文档中描述,Mapper的数量在默认情况下不可直接控制干预,因为Mapper的数量由输入的大小和个数决定。在默认情况下,最终input占据了多少block,就应该启动多少个Mapper。如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成启动的Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导致崩溃。这些逻辑确实是正确的,但都是在默认情况下的逻辑。其实如果进行一些客户化的设置,就可以控制了。

Hadoop中,设置Map task的数量不像设置Reduce task数量那样直接,即:不能够通过API直接精确的告诉Hadoop应该启动多少个Map task。

你也许奇怪了,在API中不是提供了接口org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)吗?这个值难道不可以设置Map task的数量吗?这个API的确没错,在文档上解释”Note: This is only a hint to the framework.“,即这个值对Hadoop的框架来说仅仅是个提示,不起决定性的作用。也就是说,即便你设置了,也不一定得到你想要的效果。

更多Hadoop相关信息见Hadoop 专题页面 ?tid=13

1. InputFormat介绍

在具体设置Map task数量之前,非常有必要了解一下与Map-Reduce输入相关的基础知识。

这个接口(org.apache.hadoop.mapred.InputFormat)描述了Map-Reduce job的输入规格说明(input-specification),它将所有的输入文件分割成逻辑上的InputSplit,每一个InputSplit将会分给一个单独的mapper;它还提供RecordReader的具体实现,这个Reader从逻辑的InputSplit上获取input records并传给Mapper处理。

InputFormat有多种具体实现,诸如FileInputFormat(处理基于文件的输入的基础抽象类), DBInputFormat(处理基于数据库的输入,数据来自于一个能用SQL查询的表),KeyValueTextInputFormat(特殊的FineInputFormat,处理Plain Text File,文件由回车或者回车换行符分割成行,每一行由key.value.separator.in.input.line分割成Key和Value),CompositeInputFormat,DelegatingInputFormat等。在绝大多数应用场景中都会使用FileInputFormat及其子类型。

通过以上的简单介绍,我们知道InputFormat决定着InputSplit,每个InputSplit会分配给一个单独的Mapper,因此InputFormat决定了具体的Map task数量


2. FileInputFormat中影响Map数量的因素

在日常使用中,FileInputFormat是最常用的InputFormat,它有很多具体的实现。以下分析的影响Map数量的因素仅对FileInputFormat及其子类有效,其他非FileInputFormat可以去查看相应的 getSplits(JobConf job, int numSplits) 具体实现即可。

请看如下代码段(摘抄自org.apache.hadoop.mapred.FileInputFormat.getSplits,hadoop-0.20.205.0源代码):

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);   long minSize = Math.max(job.getLong("mapred.min.split.size"1), minSplitSize);      for (FileStatus file: files) {     Path path = file.getPath();     FileSystem fs = path.getFileSystem(job);     if ((length != 0) && isSplitable(fs, path)) {        long blockSize = file.getBlockSize();       long splitSize = computeSplitSize(goalSize, minSize, blockSize);              long bytesRemaining = length;       while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {         String[] splitHosts = getSplitHosts(blkLocations,length-bytesRemaining, splitSize, clusterMap);         splits.add(new FileSplit(path, length-bytesRemaining, splitSize, splitHosts));         bytesRemaining -= splitSize;       }          if (bytesRemaining != 0) {         splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts()));       }     } else if (length != 0) {       String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);       splits.add(new FileSplit(path, 0, length, splitHosts));     } else {        //Create empty hosts array for zero length files        splits.add(new FileSplit(path, 0, length, new String[0]));     }   }      return splits.toArray(new FileSplit[splits.size()]);      protected long computeSplitSize(long goalSize, long minSize, long blockSize) {       return Math.max(minSize, Math.min(goalSize, blockSize));   }  

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/c92a87d12a495fcf779d05b20acaf0b1.html