public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
str.clear();
int txtLength = 0;
int newlineLength = 0;
boolean prevCharCR = false;
long bytesConsumed = 0L;
do {
int startPosn = this.bufferPosn;
if (this.bufferPosn >= this.bufferLength) {
startPosn = this.bufferPosn = 0;
if (prevCharCR) bytesConsumed ++;
this.bufferLength = this.in.read(this.buffer);
if (this.bufferLength <= 0) break;
}
for (; this.bufferPosn < this.bufferLength; this.bufferPosn ++) {
if (this.buffer[this.bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
this.bufferPosn ++;
break;
}
if (prevCharCR) {
newlineLength = 1;
break;
}
prevCharCR = this.buffer[this.bufferPosn] == CR;
}
int readLength = this.bufferPosn - startPosn;
if ((prevCharCR) && (newlineLength == 0))
--readLength;
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(this.buffer, startPosn, appendLength);
txtLength += appendLength; }
}
while ((newlineLength == 0) && (bytesConsumed < maxBytesToConsume));
if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);
return (int)bytesConsumed;
}
//以下是hadoop-core中LineReader的源码_end
}
2、已经按主键key排好序了,并保证相同主键key一定是在一起的,假设每条记录的第一个字段为主键,那么如 果沿用上面的LineReader,需要在核心方法readLine中对前后两条记录的id进行equals判断,如果不同才进行split,如果相同继 续下一条记录的判断。代码就不再贴了,但需要注意的地方,依旧是前后两个buffer进行交接的时候,非常有可能一条记录被切成了两半,一半在前一个buffer中,一半在后一个buffer中。
这种方式的好处在于少去了reduce操作,会大大地提高效率,其实mapper的过程相当的快,费时的通常是reduce。