最近开始使用MapReduce,发现网上大部分例子都是对文本数据进行处理的,也就是说在读取输入数据时直接使用默认的TextInputFormat进行处理即可。对于文本数据处理,这个类还是能满足一部分应用场景。但是如果要处理以二进制形式结构化记录存储的文件时,这些类就不再适合了。
本文以一个简单的应用场景为例:对按照二进制格式存储的整数做频数统计。当然,也可以在此基础上实现排序之类的其他应用。实现该应用的主要难点就是如何处理输入数据。参考《权威指南·第三版》得知需要继承FileInputFormat这个类,并实现以下三个方法:
class MyInputFormat extends FileInputFormat<Type1, Type2> {
/*
* 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
*/
protected boolean isSplitable(Configuration conf, Path path) {
}
/*
* MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
* 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
* 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
* 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
* */
public List<InputSplit> getSplits(Configuration conf) throws IOException {
}
/*
* 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
* 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
* public void run(Context context) throws IOException, InterruptedException {
* setup(context);
* 调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
* while (context.nextKeyValue()) {
* map(context.getCurrentKey(), context.getCurrentValue(), context);
* }
* cleanup(context);
* }
**/
public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
}
--------------------------------------分割线 --------------------------------------
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置
Ubuntu上搭建Hadoop环境(单机模式+伪分布模式)
--------------------------------------分割线 --------------------------------------
在RecordReader函数中实现以下几个接口:
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
/*关闭文件流
* */
public void close() {}
/*
* 获取处理进度
**/
public float getProgress() {}
/*
* 获取当前的Key
* */
public LongWritable getCurrentKey() throws IOException,
InterruptedException {}
/* 获取当前的Value
* */
public IntWritable getCurrentValue() throws IOException,InterruptedException {}
/*
* 进行初始化工作,打开文件流,根据分块信息设置起始位置和长度等等
* */
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {}
/*生成下一个键值对
**/
public boolean nextKeyValue() throws IOException, InterruptedException {
}
}