Hadoop API 学习笔记(5)

对于一些应用,需要特殊的数据结构来存储数据。比如运行基于MapReduce的进程,当存储数据时,将每个二进制数据块放入它自己的文件,这样做使得后期不容易扩展。为此,Hadoop开发了一系列高级容器。

一、SequenceFile类

包为:org.apache.hadoop.io.SequenceFile

Hadoop的SequenceFile类为二进制键值对提供了一个持续化的数据结构。它提供了 Writer, Reader and SequenceFile.Sorter 类能独立执行读、写以及排序操作。

如果想应用于日志文件格式,需要选择一个键(如LongWritable表示时间戳)和一个值(如Writable表示日志记录的数量)。

用SequenceFile类作为小型文件的容器也不错。HDFS和MapReduce是大型文件的利器,当我们把文件打包到一个SequenceFile类中,我们能够高效对小型文件进程存储和处理。


(1)创建一个SequenceFile类

该类提供了一种静态方法创建SequenceFile.Writer实例。而且有几个重载方法。hadoop推荐使用静态构造方法如下:

public static org.apache.hadoop.io.SequenceFile.Writer createWriter(FileContext fc, //文件上下文 Configuration conf, //配置信息 Path name, //文件path Class keyClass, //键类 Class valClass, //值类 CompressionType compressionType, //压缩类型 CompressionCodec codec, //压缩器 Metadata metadata, //文件头部metadata EnumSet<CreateFlag> createFlag, //给出创建的语义如overwrite org.apache.hadoop.fs.Options.CreateOpts... opts) //可选项 throws IOException

存储在SequenceFile类中的键和值不一定必须是Writable。可以被SequenceFile类序列化和反序列的任何类型都可以使用。

在SequencdFile.Writer之后,就用append()方法写入键/值对。然后在结束的时候调用close()方法。(SequenceFile.Write实现了java.io.Closeable)。

下面就是一个SequenceFile类的程序例子:

public static void main(String[] args) throws IOException(

String uri = args[0];                                               //创建FileSystem的步骤已经在学习小结一中学过,这里就不再做介绍

Configuration conf = new Configuration();

FileSystem fs = FileSystem.gei(URI.create(uri), conf);

Paht path = new Path(uri);


IntWritable key = new IntWritable();          //创建键

Text value = new Text();                           //创建值

SequencdFile.Writer writer = null;

try{

writer = SequencdFile.createWriter(fs,conf,path,key.getClass(),value.getClass() );       //创建Writer

key.set(data);

value.set(data);

writer.append(key,value);

}finally{

IOUtils.closeStream(writer);

}

}


(2)读取SequenceFile类

从头到尾读取序列文件,需要创建一个SequenceFile.Reader实例。反复调用next()方法之一遍历记录。使用哪一方法取决于所使用的序列化框架。比如Writable类型,可以为:

public boolean next(Writable key, Writable value);    //使用键值作为参数,如果读取是一个键值对,返回true。如果读取到文件末尾,返回false。

下面就是一个读取序列文件的例子:

public static void main(String[] args){

String uri = args[0];

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(URI.create(uri),conf);

Path path = new Path(uri);


SequenceFile.Reader reader = null;

try{

reader = new SequenceFile.Reader(fs,path,conf);

Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);  //使用Reflection工具类创建key的一个实例

Writable key = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

long position = reader.getPosition();           //取得reader当前位置

while(reader.next(key,value)){                    //不断往下读取

String syncSeen = reader.syncSeen() ? " * " : " ";    //插入同步点

System.out.printf(position,syncSeen,key,value);

position = reader.getPosition();

}

}finally{

IOUtils.closeStream(reader);

}

}

此程序能显示序列文件中同步点的位置。同步点是流中的一个点,如果reader失去对位置的判断,同步点就可用于重新同步记录边界,例如在查找流中任意一个位置之后。同步点由SequenceFile.Reader来记录,当序列文件被写入的时候,它会每隔几个记录就插入一个特殊的项来标记此同步点。插入的开销非常小。上面我就使用星号*标记同步点。

有两种方法查找序列文件中指定的位置。第一种是seek()方法,第二种是sync(long position)方法。


三、序列文件的格式

序列文件由一个头部和一个或多个记录组成。

序列文件有三种类型,分别为1无压缩类型,2有压缩类型,3和块压缩类型。它们的记录组成不同,但序列文件头都相同。

SequenceFile 头格式:

version - 3 字节SEQ,后接1字节版本号 keyClassName -键类名 valueClassName - 值类名 compression - 布尔类型,标识是否对键值对启用压缩 blockCompression - 布尔类型,标识是否对键值对启用块压缩 compression codec - 编码解码器类型 metadata - SequenceFile的Metadata  sync - 一个同步标记记录头结尾

记录的内部格式:

取决于是否启用压缩,如果是,要么是记录压缩,要么是块压缩。

1无压缩类型:如果没有启用压缩(默认设置)那么每个记录就由它的记录长度(字节数)、键的长度,键和值组成。长度字段为四字节。

2有压缩类型:记录压缩格式与无压缩格式基本相同,不同的是值字节是用定义在头部的编码器来压缩。注意,键是不压缩的。

3块压缩类型:块压缩一次压缩多个记录,因此它比记录压缩更紧凑,而且一般优先选择。当记录的字节数达到最小大小,才会添加到块。该最小值由io.seqfile.compress.blocksize中的属性定义。默认值是1000000字节。格式为记录数、键长度、键、值长度、值。




四、MapFile类

MapFile类是经过排序的带索引的SequenceFile,可以根据键进行查找。MapFile可以被认为是java.util.Map的一种持久化形式。它会无限增长,直至超过map在内存中占有的大小。

创建MapFile类的步骤和创建SequenceFile类差不多,这里我们就不再介绍。

(1)它有两个静态成员变量:

static String DATA_FILE_NAME;         //数据文件名

static String INDEX_FILE_NAME;        //索引文件名


(2)常用方法:

void rename(FileSystem fs, String oldName, String newName)   //对已存在的map目录重命名


long fix(FileSystem fs, Path dir, Class<? extends Writable> keyClass, Class<? extends Writable> valueClass, boolean dryrun, Configuration conf)        //该方法通过重新建立索引来定位出故障的MapFile,返回MapFile中无效的实体数

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/6bf24715fd59cbdc242137e47f68dd27.html