自定义Hadoop Map/Reduce输入文件切割InputFormat

Hadoop会对原始输入文件进行文件切割,然后把每个split传入mapper程序中进行处理,FileInputFormat是所有以文件作 为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方 法。至于获得记录的方法是有不同的子类进行实现的。

那么,FileInputFormat是怎样将他们划分成splits的呢?FileInputFormat只划分比HDFS block大的文件,所以如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

hadoop默认的InputFormat是TextInputFormat,重写了FileInputFormat中的createRecordReader和isSplitable方法。该类使用的reader是LineRecordReader,即以回车键(CR = 13)或换行符(LF = 10)为行分隔符。

但大多数情况下,回车键或换行符作为输入文件的行分隔符并不能满足我们的需求,通常用户很有可能会输入回车键、换行符,所以通常我们会定义不可见字符(即用户无法输入的字符)为行分隔符,这种情况下,就需要新写一个InputFormat。

又或者,一条记录的分隔符不是字符,而是字符串,这种情况相对麻烦;还有一种情况,输入文件的主键key已经是排好序的了,需要hadoop做的只是把相 同的key作为一个数据块进行逻辑处理,这种情况更麻烦,相当于免去了mapper的过程,直接进去reduce,那么InputFormat的逻辑就相 对较为复杂了,但并不是不能实现。

1、改变一条记录的分隔符,不用默认的回车或换行符作为记录分隔符,甚至可以采用字符串作为记录分隔符
1)自定义一个InputFormat,继承FileInputFormat,重写createRecordReader方法,如果不需要分片或者需要改变分片的方式,则重写isSplitable方法,具体代码如下:

public class FileInputFormatB extends FileInputFormat<LongWritable, Text> {

@Override

public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) {
return new SearchRecordReader("\b");

}

@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
// 输入文件不分片
return false;
}
}

2)关键在于定义一个新的SearchRecordReader继承RecordReader,支持自定义的行分隔符,即一条记录的分隔符。标红的地方为与hadoop默认的LineRecordReader不同的地方。

public class IsearchRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);

private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
//行分隔符,即一条记录的分隔符
private byte[] separator = {'\b'};
private int sepLength = 1;

public IsearchRecordReader(){
}
public IsearchRecordReader(String seps){
this.separator = seps.getBytes();
sepLength = separator.length;
}

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);

this.start = split.getStart();
this.end = (this.start + split.getLength());
Path file = split.getPath();
this.compressionCodecs = new CompressionCodecFactory(job);
CompressionCodec codec = this.compressionCodecs.getCodec(file);

// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
this.in = new LineReader(codec.createInputStream(fileIn), job);
this.end = Long.MAX_VALUE;
} else {
if (this.start != 0L) {
skipFirstLine = true;
this.start -= sepLength;
fileIn.seek(this.start);
}
this.in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));

if(newSize > 0){
start += newSize;
}
}

this.pos = this.start;
}

public boolean nextKeyValue() throws IOException {
if (this.key == null) {
this.key = new LongWritable();
}
this.key.set(this.pos);
if (this.value == null) {
this.value = new Text();
}
int newSize = 0;
while (this.pos < this.end) {
newSize = this.in.readLine(this.value, this.maxLineLength, Math.max(
(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength));

if (newSize == 0) {
break;
}
this.pos += newSize;
if (newSize < this.maxLineLength) {
break;
}

LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
}

if (newSize == 0) {
//读下一个buffer
this.key = null;
this.value = null;
return false;
}
//读同一个buffer的下一个记录
return true;
}

public LongWritable getCurrentKey() {
return this.key;
}

public Text getCurrentValue() {
return this.value;
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/6591f87de942ee440d89bc3c8a719b68.html