Hadoop API 学习小结(3)

文件压缩带来两大好处,它减少了存储文件所需的空间并且加快了数据在网络上或者从磁盘上或到磁盘的传输速度。在处理大容量数据时,这些都是很重要的,因此对于Hadoop中如何使用压缩,值得仔细考虑。一下压缩格式可用于hadoop。

压缩格式                        工具                 算法                     文件扩展名                 多文件                                可分割性

DEFLATE*                       无                DEFLATE                .deflate                          不                                     不

Gzip                               gzip               DEFLATE                 .gz                                    不                                     不

ZIP                                   zip               DEFLATE                ..zip                                 是                                     是,在文件范围内

bzip2                              bzip2              bzip2                       .bz2                                  不                                  是

LZO                                lzop                LZO                        .lzo                                  不                                     不


更快的压缩和解压速度通常会耗费更多的空间。-1表示速度最优,-9表示空间最优。例如下面命令使用最快的压缩算法创建一个压缩文件file.gz

gzip   -1    file

上面的可分割性指压缩格式是否支持分割,也就是说,你是否可以找到数据流中任何一个点并且从某点开始读取。可分割压缩格式特别适合mapreduce


二、编码,解码器。

常用编码器:

org.apache.hadoop.io.compress.DefaultCodec            对应压缩格式为DEFLATE

org.apache.hadoop.io.compress.GzipCodec                 对应Gzip


三、hadoop API中,已经有实现编码,解码的接口。


接口:

(1)CompressionCodec 接口       

该接口包括解压,压缩的方法流

常用方法:  createInputStream(InputStream in)   //从指定输入流中创建一个压缩流

createOutputStream(OutputStream out)   //从给定输出流中创建一个压缩流

getCompressorType()   //返回需要的编码器的类型

getDefaultExtension()   //返回编码器的文件扩展名

如下所示,压缩从标准输入读取的数据并将它写到标准输出

Class<?> codecClass = class.forName(codecClassname);     //使用指定编码器类来创建一个压缩类,比如GzipCodec编码器

Configuration  conf = new Configuration();

CompressionCodec codec = (CompressionCodec) ReflectionUtil.newInstance(codexClass,conf);    //创建新的实例

CompressionOutputStream out = codec.createOutputStream(System.out);                 //获得压缩好的out

IOUtils.copyBytes(System.in,out,4096,false);   //将输入System.in复制到经过CompressionOutputStream的输出out

out.finish();         //结束向压缩流写入数据,但不关闭流                                   


(2)CompressionCodeFactory类

该类为工厂模式,从给定文件名找到编解码器。

方法:

CompressionCodec  getCodec(Path file)  //根据文件后缀获得相关的压缩编解码器

String removeSuffix(String filename, String suffix)   //如果文件有后缀名,去掉后缀名

四、本地库

考虑到性能,最好使用一个本地库(native library)来压缩和解压。例如,在一个测试中,使用本地gzip压缩库减少了解压时间50%,压缩时间减少了大约10%(与内置的java实现比较)。hadoop带有预置的32位和64位linux本地压缩库,支持的格式有DEFLATE、Gzip、LZO,位于库/本地目录。对于其他平台,需要自己编译库。

默认情况下,hadoop会在它运行的平台上查找本地库,如果发现,就自动加载。这意味着不必要更改任何设置就可以使用本地库。在某些情况下,可能希望禁用本地库,比如在调试压缩相关问题时候。为此,将属性hadoop.native.lib设置为false即可。

如果要用本地库在应用中大量执行压缩解压任务,可以考虑使用CodePool(压缩解码池)。从而重用压缩程序和解压程序,节约创建对象的开销。

CodePool类常用方法有:

static Compressor getCompressor(CompressionCodec codec, Configuration conf)  //根据指定压缩器从池中获得一个Compressor实例

static void  returnCompressor(Compressor compressor)    //把Compressor实例返回到池中,下一次即可复用。


五、压缩和输入分割

在考虑如何压缩那些将由MapReduce处理的数据时,考虑压缩格式是否支持分割是很重要的。考虑储存在HDFS中的未压缩的文件,其大小为1GB.HDFS的块大小为64MB,所以该文件被储存为16块,将此文件用作输入的MapReduce作业会创建16个输入分片(split)。每个分片都被作为一个独立map任务的输入单独进行处理。


前面我提到过,gzip和DEFLATE不支持分割,所以map任务也就不可能独立于其他块读取块中的数据。这样就必须以本地化为代价,一个map任务将处理16个HDFS块。由于map任务少,作业分割的粒度不够细,从而导致运行时间变长。那么,我们应采用哪种压缩格式呢,显然,根据不同情况,可选择支持分割机制的压缩格式如bzip2,或者使用支持压缩和分割机制的Sequence File(序列文件)。对于大型文件,不要对整个文件使用不支持分割的压缩格式,因为这样会损失本地性优势,从而降低MapReduce应用的性能。


六、在MapReduce中使用压缩。

如果输入的文件是压缩过的,那么在被MapReduce读取时,它们会被自动解压,根据文件的扩展名来决定使用哪一个压缩解码器。

如果要压缩MapReduce作业的输出,请在作业配置文件中将mapred.output.compress属性设置为true,将mapred.output.compression.codec属性设置为自己想用的压缩编码/解码器的类型。如下的mapreduce程序,可以这样在绿色部分这样设置:

JobConf conf = new JobConf(className);

conf.setJobName("jobname");

FileInputFormat.addInputPath(conf,path);

FileOutputFormat.setOutputPath(conf,path);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);


  conf.setBoolean("mapred.output.compress",true);

conf.setClass("mapred.output.compression.codec",GzipCodec.class,CompressionCodec.class);


  conf.setMapperClass(mapperclassname);

conf.setCombinerClass(reduceclassname);

conf.setReducerClass(reduceclassname);

JobClient.runJob(conf);

改程序运行后,最终输出的部分都是被压缩过的。可以设置mapred.output.compression.type属性来控制压缩类型。默认为RECORD,他压缩单独的记录。推荐更改为BLOCK,因为它是压缩成一组记录,而且有更好的压缩比。

我们其实还可以进一步优化,在map作业输出结果中进行压缩,因为map输出会被写入本地磁盘并通过网络传输到reduce节点。如果使用LZO之类的快速压缩,能,较少传输的数据量,提示性能。如:

conf.setCompressMapOutput(true);

conf.setMapOutputCompressorClass(GzipCodec.class);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/1c0429fd753294a8929b9ceb1b1b64c8.html