HIVE作为在Hadoop分布式框架下的数据仓库技术,处理大数据量是最基本的诉求,这种海量处理是基于分布式框架,利用分布式存储,分布式计算,利用大集群的资源并行处理海量数据。但是一旦我们不能利用这种分布式并行处理,那么海量数据只能是低效处理了。再往细处说,就是一份海量数据需要多少map来处理,一个map能处理多少数据,这些都制约着数据处理的效率。
HIVE的执行效率问题可以大概归结:
1.如何使大数据量利用合适的map数
2.如何使单个map处理合适的数据记录数
这两点是相辅相成的,在保证单个map处理合适的记录数后,就能确定整个数据量需要多少map数。貌似好像解决了,但是这是个矛与盾的问题,不是那么好解决的,因为hive是根据数据量和文件数来确定map数,不会去考虑一个map处理多少记录数。我们要先去影响map数,然后再调整单个map处理的记录数,在特定情况下是要两手都要抓,两手都要硬,在矛与盾之间找到平衡。
1.如何使大数据量利用合适的map数
HIVE利用的默认split策略,基本上是使一个map处理近似于dfs.block.size大小的数据量,一般利用此策略基本可以完成大部分的数据处理。也就是说,通常情况下,能使我们的数据按照默认的split策略切分数据,得到的map数就是比较合适的,但是在使用HIVE中,我们不可能避免HQL不产生小文件(数据量远小于dfs.block.size),因为是否是小文件跟HQL的逻辑,数据分布,数据量等等都一定程度的关系,但小文件会浪费掉很多map,一个文件就需要一个map,并且一个文件可能就只包含几千,几万条记录,对map资源是个浪费。而且每个HQL如果占用太多map,在整个集群中说不定分配资源就成个问题,看似给了很多map,但是根本就抢不到资源。所以我们要合并小文件,那么使用数据时可以分配比较合理的map数。
合并小文件的方式不少,但各有利弊,不同的数据需求可以采用不同的方式:
1.设置HIVE的参数,使HIVE在数据生成时自动进行小文件合并,方法请参看:
2.设置HIVE的参数,使HIVE在获取数据是先进行小文件合并,set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
3.自己开发小文件合并工具,这个是自由度最高的方法
合并小文件是会消耗资源,何时合并小文件,哪些数据需要合并小文件一定要考虑全面点。
合并小文件的目的基本上要达到是一个map处理的数据量近似等于dfs.block.size。这样从整个namenode的quota,整个系统map/reduce资源利用率上能达到较好的效果。但如果单单追求消除小文件,针对于有些数据仓库应用是不利的。矛盾就产生了。例如HIVE上的一张表,数据量为240MB,那么hive应该给他一个文件,占用一个block(dfs.block.size=268435456),但是这张表只有2个字段,记录长度平均24B,那么这张表就有超过1千万的记录数,但是这么多记录数只有一个map来处理,一个map的效率是可想而知的。所以我们要在数据处理的map数和单map处理的记录数上做到平衡,这种平衡的依据更多是要考虑此数据处理在整个数据仓库模型中的地位,如果是个核心的表,会被N个数据处理所用到,在不更改数据模型的前提下,我们要更偏向于这张表的数据存储分散在多个小文件中,这样利用此表时就能利用更多的map,并且单个map处理记录数少一点,利用效率会更高。
2.如何使单个map/reduce处理合适的数据记录数
为了解决矛与盾的平衡,在合并小文件的同时,对特殊的数据处理,我们是要一定程度地拆分成小文件,但拆分的标准是:保证单个map处理的记录数是合适的,单个map处理的数据量在100W-300W之间是比较好的,同时也要考虑数据处理的复杂度。
简单的办法:数据冗余
接着上面提到的240MB表的例子,如果我们冗余一个没有任何意义的字段,使记录的长度增长到100B,表大小增长到1GB,那么需要4个map处理这1千万的数据,平均1个map处理250W左右的记录数,相较1个map处理1000W的记录数,效率是提升很多的。
但这种方法应用到数据处理的临时表刷新和使用时没问题的,但在最终的事实表或者汇总表冗余这么个个没有意义的字段就太不应该了。
另一种办法:
1.set mapred.reduce.tasks(预计一个文件包含的记录数,确定计划要输出的文件数)
2.利用distribute by rand(123)把数据分散到各个reduce里
举例说明,如何调整一个map处理的记录数。
hive> select count(1) from shaka01;
OK
29022319
Time taken: 92.263 seconds
--HDFS的block.size
hive> set dfs.block.size;
dfs.block.size=268435456
--shaka01对应的文件,每个文件大于block.size,利用此表时会有4个map执行,平均1个map处理750W条记录
336495295 /group/hive/shaka01/attempt_201206121523_2432943_m_000000_0
338594924 /group/hive/shaka01/attempt_201206121523_2432943_m_000001_0