在实际使用PIG处理数据时,会经常要处理大批量的小文件。在这种情况下,如果不对Pig脚本进行任何特别设置,默认情况下很有可能会遇到类似这样的“命名空间超过配额限制”的错误:
org.apache.Hadoop.hdfs.protocol.NSQuotaExceededException: org.apache.hadoop.hdfs.protocol.NSQuotaExceededException: The NameSpace quota (directories and files) of directory /projects/user_grid is exceeded: quota=1000000 file count=1000001
或者如果你发现,你的Pig脚本运行结果会产生数量巨大的输出文件(通常在没有reduce时),比如几万甚至几十万个文件输出,则这条经验应该能解决你的问题。
出现这种问题的原因就是由于在处理数据过程中,Pig为每一个输入文件都创建了一个相应的mapper,每个mapper就会产生相应的一个输出文件。这种行为当然是正确的,也是Hadoop框架的设计所要求的,因为Hadoop框架会为每个data block创建一个mapper,如果一个文件小于dfs.block.size(默认为64M),则此文件会独占一个block,不与别的文件共享该data block。这种默认行为对于输入data block数量(或者文件数量)巨大的情况下,代价会非常昂贵,执行效率非常低。解决方案很自然就是将data block做适当的合并,然后为合并后的每个split创建一个mapper。
在Pig 0.80之后的版本,Pig提供了能够合并多个输入文件生成一个split的功能。当在Pig脚本中设置了
SET pig.splitCombination true; SET pig.maxCombinedSplitSize 134217728; -- 134217728 = 128M
之后,Pig在运行mapper之前,首先会把小于128M的小文件都合并成128M之后,再创建与之对应的mapper。如果没有设置maxCombinedSplitSize,则Pig会自动按照HDFS的block size合并小文件。如果需要将自动合并小文件的功能关闭,只需要 ‘SET pig.splitCombination false;’ 即可。根据目前我的经验,至少Pig 0.91版本默认pig.splitCombination为false。 从实现原理上讲,当设置了pig.splitCombination为true之后,Pig将使用CombinedInputFormat来读取输入,使用CombinedInputSplit实例而非默认的InputSplit实例。CombinedInputFormat用来代替即将弃用的MultiFileInputFormat,会基于data block的locality特性批量合并小文件。在Pig内部,如果输入Input的locality(位置)信息不可用,那么此接口也能正常工作。因此combined input format将有多个通用的split合并而成,直到合并的size达到pig.maxCombinedSplitSize或者dfs.split.size。然而由于在merge join table中排序的限制,split combination将不会用在任何有merge join的地方。但是在map阶段的cogroup和map阶段的group by,那些split会被合并,因为在map阶段的这些操作,仅仅要求被合并的数据保留重复的key,combine并没有影响到重复key这些要素。在合并过程中,在同一节点上的split尽可能的被合并,剩下未合并的split将不会考虑locality因素而拷贝到其他node上去合并。在每个结点上,将会采用贪婪的方法合并,最大的split会优先于小的split合并。
本地执行job占全部job的比例越高,则执行速度越快。在Job tracker的监控web页面中,会有一个summary页面显示该job的各项数据,在Job Counter一栏中有Rack-local map tasks、Launched map tasks、Data-local map tasks三项数据,分别表示在同一个机柜中执行的map task数目,启动的map task数目,本地执行的map task数目。在这个summary页面的上方,还会显示successful map tasks,表示最终成功执行的map task数目。我们可以简单的用 data-local map tasks / successful map tasks = 本地执行的map tasks的比例作为指标来衡量在map阶段的效率。
我测试了一下,输入是10T的二进制日志数据,做一般字段的抽取,输出为table分割的文本,没有reduce操作,hadoop集群有2773个Map slot可用测试结果如下:
maxCombineSplitSize
Data-local Map Tasks
Successful Map Tasks
Execution Duration
Ratio
3600000000
2026
3284
3H 19' 30''
2026/3807 = 0.532
4250000000
1580
2772
3H 28' 27''
1580/2805 = 0.563
2000000000
4283
5620
2H 31' 59''
4283/6396 = 0.6696
1000000000
8729
10964
2H 4' 39''
8729/12386 = 0.7047
512000000
20200
23806
1H 57' 4''
20200/23806 = 0.8489