对于一些应用,需要特殊的数据结构来存储数据。比如运行基于MapReduce的进程,当存储数据时,将每个二进制数据块放入它自己的文件,这样做使得后期不容易扩展。为此,Hadoop开发了一系列高级容器。
一、SequenceFile类
包为:org.apache.hadoop.io.SequenceFileHadoop的SequenceFile类为二进制键值对提供了一个持续化的数据结构。它提供了 Writer, Reader and SequenceFile.Sorter 类能独立执行读、写以及排序操作。
如果想应用于日志文件格式,需要选择一个键(如LongWritable表示时间戳)和一个值(如Writable表示日志记录的数量)。
用SequenceFile类作为小型文件的容器也不错。HDFS和MapReduce是大型文件的利器,当我们把文件打包到一个SequenceFile类中,我们能够高效对小型文件进程存储和处理。
(1)创建一个SequenceFile类
该类提供了一种静态方法创建SequenceFile.Writer实例。而且有几个重载方法。hadoop推荐使用静态构造方法如下:
public static org.apache.hadoop.io.SequenceFile.Writer createWriter(FileContext fc, //文件上下文 Configuration conf, //配置信息 Path name, //文件path Class keyClass, //键类 Class valClass, //值类 CompressionType compressionType, //压缩类型 CompressionCodec codec, //压缩器 Metadata metadata, //文件头部metadata EnumSet<CreateFlag> createFlag, //给出创建的语义如overwrite org.apache.hadoop.fs.Options.CreateOpts... opts) //可选项 throws IOException存储在SequenceFile类中的键和值不一定必须是Writable。可以被SequenceFile类序列化和反序列的任何类型都可以使用。
在SequencdFile.Writer之后,就用append()方法写入键/值对。然后在结束的时候调用close()方法。(SequenceFile.Write实现了java.io.Closeable)。
下面就是一个SequenceFile类的程序例子:
public static void main(String[] args) throws IOException(
String uri = args[0]; //创建FileSystem的步骤已经在学习小结一中学过,这里就不再做介绍
Configuration conf = new Configuration();
FileSystem fs = FileSystem.gei(URI.create(uri), conf);
Paht path = new Path(uri);
IntWritable key = new IntWritable(); //创建键
Text value = new Text(); //创建值
SequencdFile.Writer writer = null;
try{
writer = SequencdFile.createWriter(fs,conf,path,key.getClass(),value.getClass() ); //创建Writer
key.set(data);
value.set(data);
writer.append(key,value);
}finally{
IOUtils.closeStream(writer);
}
}
(2)读取SequenceFile类
从头到尾读取序列文件,需要创建一个SequenceFile.Reader实例。反复调用next()方法之一遍历记录。使用哪一方法取决于所使用的序列化框架。比如Writable类型,可以为:
public boolean next(Writable key, Writable value); //使用键值作为参数,如果读取是一个键值对,返回true。如果读取到文件末尾,返回false。
下面就是一个读取序列文件的例子:
public static void main(String[] args){
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try{
reader = new SequenceFile.Reader(fs,path,conf);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); //使用Reflection工具类创建key的一个实例
Writable key = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition(); //取得reader当前位置
while(reader.next(key,value)){ //不断往下读取
String syncSeen = reader.syncSeen() ? " * " : " "; //插入同步点
System.out.printf(position,syncSeen,key,value);
position = reader.getPosition();
}
}finally{
IOUtils.closeStream(reader);
}
}
此程序能显示序列文件中同步点的位置。同步点是流中的一个点,如果reader失去对位置的判断,同步点就可用于重新同步记录边界,例如在查找流中任意一个位置之后。同步点由SequenceFile.Reader来记录,当序列文件被写入的时候,它会每隔几个记录就插入一个特殊的项来标记此同步点。插入的开销非常小。上面我就使用星号*标记同步点。
有两种方法查找序列文件中指定的位置。第一种是seek()方法,第二种是sync(long position)方法。
三、序列文件的格式
序列文件由一个头部和一个或多个记录组成。
序列文件有三种类型,分别为1无压缩类型,2有压缩类型,3和块压缩类型。它们的记录组成不同,但序列文件头都相同。
SequenceFile 头格式:
version - 3 字节SEQ,后接1字节版本号 keyClassName -键类名 valueClassName - 值类名 compression - 布尔类型,标识是否对键值对启用压缩 blockCompression - 布尔类型,标识是否对键值对启用块压缩 compression codec - 编码解码器类型 metadata - SequenceFile的Metadata sync - 一个同步标记记录头结尾记录的内部格式:
取决于是否启用压缩,如果是,要么是记录压缩,要么是块压缩。
1无压缩类型:如果没有启用压缩(默认设置)那么每个记录就由它的记录长度(字节数)、键的长度,键和值组成。长度字段为四字节。
2有压缩类型:记录压缩格式与无压缩格式基本相同,不同的是值字节是用定义在头部的编码器来压缩。注意,键是不压缩的。
3块压缩类型:块压缩一次压缩多个记录,因此它比记录压缩更紧凑,而且一般优先选择。当记录的字节数达到最小大小,才会添加到块。该最小值由io.seqfile.compress.blocksize中的属性定义。默认值是1000000字节。格式为记录数、键长度、键、值长度、值。
四、MapFile类
MapFile类是经过排序的带索引的SequenceFile,可以根据键进行查找。MapFile可以被认为是java.util.Map的一种持久化形式。它会无限增长,直至超过map在内存中占有的大小。
创建MapFile类的步骤和创建SequenceFile类差不多,这里我们就不再介绍。
(1)它有两个静态成员变量:
static String DATA_FILE_NAME; //数据文件名
static String INDEX_FILE_NAME; //索引文件名
(2)常用方法:
void rename(FileSystem fs, String oldName, String newName) //对已存在的map目录重命名
long fix(FileSystem fs, Path dir, Class<? extends Writable> keyClass, Class<? extends Writable> valueClass, boolean dryrun, Configuration conf) //该方法通过重新建立索引来定位出故障的MapFile,返回MapFile中无效的实体数