自定义Hadoop Map/Reduce输入文件切割InputFormat(2)

public float getProgress() {
if (this.start == this.end) {
return 0.0F;
}
return Math.min(1.0F, (float) (this.pos - this.start) / (float) (this.end - this.start));
}

public synchronized void close() throws IOException {
if (this.in != null)
this.in.close();
}

}

3)重写SearchRecordReader需要的LineReader,可作为SearchRecordReader内部类。特别需要注意的地方就 是,读取文件的方式是按指定大小的buffer来读,必定就会遇到一条完整的记录被切成两半,甚至如果分隔符大于1个字符时分隔符也会被切成两半的情况, 这种情况一定要加以拼接处理。

public class LineReader {
//回车键(hadoop默认)
//private static final byte CR = 13;
//换行符(hadoop默认)
//private static final byte LF = 10;

//按buffer进行文件读取
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private InputStream in;
private byte[] buffer;
private int bufferLength = 0;
private int bufferPosn = 0;

LineReader(InputStream in, int bufferSize) {
this.bufferLength = 0;
this.bufferPosn = 0;

this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
}

public LineReader(InputStream in, Configuration conf) throws IOException {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
}

public void close() throws IOException {
in.close();
}

public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);
}

public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}

//以下是需要改写的部分_start,核心代码

public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
str.clear();
Text record = new Text();
int txtLength = 0;
long bytesConsumed = 0L;
boolean newline = false;
int sepPosn = 0;

do {
//已经读到buffer的末尾了,读下一个buffer
if (this.bufferPosn >= this.bufferLength) {
bufferPosn = 0;
bufferLength = in.read(buffer);

//读到文件末尾了,则跳出,进行下一个文件的读取
if (bufferLength <= 0) {
break;
}
}

int startPosn = this.bufferPosn;
for (; bufferPosn < bufferLength; bufferPosn ++) {
//处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
sepPosn = 0;
}

//遇到行分隔符的第一个字符
if (buffer[bufferPosn] == separator[sepPosn]) {
bufferPosn ++;
int i = 0;

//判断接下来的字符是否也是行分隔符中的字符
for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){

//buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
if(bufferPosn + i >= bufferLength){
bufferPosn += i - 1;
break;
}

//一旦其中有一个字符不相同,就判定为不是分隔符
if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
sepPosn = 0;
break;
}
}

//的确遇到了行分隔符
if(sepPosn == sepLength){
bufferPosn += i;
newline = true;
sepPosn = 0;
break;
}
}
}


int readLength = this.bufferPosn - startPosn;

bytesConsumed += readLength;
//行分隔符不放入块中
//int appendLength = readLength - newlineLength;
if (readLength > maxLineLength - txtLength) {
readLength = maxLineLength - txtLength;
}
if (readLength > 0) {
record.append(this.buffer, startPosn, readLength);
txtLength += readLength;

//去掉记录的分隔符
if(newline){
str.set(record.getBytes(), 0, record.getLength() - sepLength);
}
}

} while (!newline && (bytesConsumed < maxBytesToConsume));

if (bytesConsumed > (long)Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}

return (int) bytesConsumed;
}

//以下是需要改写的部分_end

//以下是hadoop-core中LineReader的源码_start

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/6591f87de942ee440d89bc3c8a719b68.html