在Hadoop中重写FileInputFormat类以处理二进制格式存

最近开始使用MapReduce,发现网上大部分例子都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理即可。对于文本数据处理,这个类还是能满足一部分应用场景。但是如果要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。

本文以一个简单的应用场景为例:对按照二进制格式存储的整数做频数统计。当然,也可以在此基础上实现排序之类的其他应用。实现该应用的主要难点就是如何处理输入数据。参考《权威指南·第三版》得知需要继承FileInputFormat这个类,并实现以下三个方法:

class MyInputFormat extends FileInputFormat<Type1, Type2> {
 /*
  * 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
  */
 protected boolean isSplitable(Configuration conf, Path path) {
 
 }
 
 /*
  * MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
  * 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
  * 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
  * 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
  * */
 public List<InputSplit> getSplits(Configuration conf) throws IOException {
 }

/*
  * 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
  * 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
  * public void run(Context context) throws IOException, InterruptedException {
  *   setup(context);
  *   调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
  *  while (context.nextKeyValue()) { 
  *   map(context.getCurrentKey(), context.getCurrentValue(), context);
  *  }
  *  cleanup(context);
  * }
  **/
 public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
   throws IOException, InterruptedException {
 }
}

--------------------------------------分割线 --------------------------------------

Ubuntu 13.04上搭建Hadoop环境

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)

Ubuntu下Hadoop环境的配置

单机版搭建Hadoop环境图文教程详解

--------------------------------------分割线 --------------------------------------

在RecordReader函数中实现以下几个接口

public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
 /*关闭文件流
  * */
 public void close() {}

/*
  * 获取处理进度
  **/
 public float getProgress() {}

/*
  * 获取当前的Key
  * */
 public LongWritable getCurrentKey() throws IOException,
 InterruptedException {}

/* 获取当前的Value
  * */
 public IntWritable getCurrentValue() throws IOException,InterruptedException {}

/*
  * 进行初始化工作,打开文件流,根据分块信息设置起始位置和长度等等
  * */
 public void initialize(InputSplit inputSplit, TaskAttemptContext context)
   throws IOException, InterruptedException {}

/*生成下一个键值对
  **/
 public boolean nextKeyValue() throws IOException, InterruptedException {
 }
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/52c41275de44647043bce6133ce242a9.html