Pig 调优实践经验总结

在实际使用PIG处理数据时,会经常要处理大批量的小文件。在这种情况下,如果不对Pig脚本进行任何特别设置,默认情况下很有可能会遇到类似这样的“命名空间超过配额限制”的错误:

org.apache.Hadoop.hdfs.protocol.NSQuotaExceededException: org.apache.hadoop.hdfs.protocol.NSQuotaExceededException:    The NameSpace quota (directories and files) of directory /projects/user_grid is exceeded: quota=1000000 file count=1000001  

或者如果你发现,你的Pig脚本运行结果会产生数量巨大的输出文件(通常在没有reduce时),比如几万甚至几十万个文件输出,则这条经验应该能解决你的问题。

出现这种问题的原因就是由于在处理数据过程中,Pig为每一个输入文件都创建了一个相应的mapper,每个mapper就会产生相应的一个输出文件。这种行为当然是正确的,也是Hadoop框架的设计所要求的,因为Hadoop框架会为每个data block创建一个mapper,如果一个文件小于dfs.block.size(默认为64M),则此文件会独占一个block,不与别的文件共享该data block。这种默认行为对于输入data block数量(或者文件数量)巨大的情况下,代价会非常昂贵,执行效率非常低。解决方案很自然就是将data block做适当的合并,然后为合并后的每个split创建一个mapper。

在Pig 0.80之后的版本,Pig提供了能够合并多个输入文件生成一个split的功能。当在Pig脚本中设置了

SET pig.splitCombination true;   SET pig.maxCombinedSplitSize 134217728; -- 134217728 = 128M  

后,Pig在运行mapper之前,首先会把小于128M的小文件都合并成128M之后,再创建与之对应的mapper。如果没有设置maxCombinedSplitSize,则Pig会自动按照HDFS的block size合并小文件。如果需要将自动合并小文件的功能关闭,只需要 ‘SET pig.splitCombination false;’ 即可。根据目前我的经验,至少Pig 0.91版本默认pig.splitCombination为false。 从实现原理上讲,当设置了pig.splitCombination为true之后,Pig将使用CombinedInputFormat来读取输入,使用CombinedInputSplit实例而非默认的InputSplit实例。CombinedInputFormat用来代替即将弃用的MultiFileInputFormat,会基于data block的locality特性批量合并小文件。在Pig内部,如果输入Input的locality(位置)信息不可用,那么此接口也能正常工作。因此combined input format将有多个通用的split合并而成,直到合并的size达到pig.maxCombinedSplitSize或者dfs.split.size。

然而由于在merge join table中排序的限制,split combination将不会用在任何有merge join的地方。但是在map阶段的cogroup和map阶段的group by,那些split会被合并,因为在map阶段的这些操作,仅仅要求被合并的数据保留重复的key,combine并没有影响到重复key这些要素。在合并过程中,在同一节点上的split尽可能的被合并,剩下未合并的split将不会考虑locality因素而拷贝到其他node上去合并。在每个结点上,将会采用贪婪的方法合并,最大的split会优先于小的split合并。

2. maxCombineSplitSize设置为多少效率最好?

本地执行job占全部job的比例越高,则执行速度越快。在Job tracker的监控web页面中,会有一个summary页面显示该job的各项数据,在Job Counter一栏中有Rack-local map tasks、Launched map tasks、Data-local map tasks三项数据,分别表示在同一个机柜中执行的map task数目,启动的map task数目,本地执行的map task数目。在这个summary页面的上方,还会显示successful map tasks,表示最终成功执行的map task数目。我们可以简单的用 data-local map tasks / successful map tasks = 本地执行的map tasks的比例作为指标来衡量在map阶段的效率。

我测试了一下,输入是10T的二进制日志数据,做一般字段的抽取,输出为table分割的文本,没有reduce操作,hadoop集群有2773个Map slot可用测试结果如下:

maxCombineSplitSize   Data-local Map Tasks   Successful Map Tasks   Execution Duration   Ratio  
3600000000   2026   3284   3H 19' 30''   2026/3807 = 0.532  
4250000000   1580   2772   3H 28' 27''   1580/2805 = 0.563  
2000000000   4283   5620   2H 31' 59''   4283/6396 = 0.6696  
1000000000   8729   10964   2H 4' 39''   8729/12386 = 0.7047  
512000000   20200   23806   1H 57' 4''   20200/23806 = 0.8489  

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/263800e64904c09bb56c49250f7b2167.html