@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
/**A.compareTo(B)
* 如果比较相同,则比较结果为0
* 如果A大于B,则比较结果为1
* 如果A小于B,则比较结果为-1
*
*/
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
//此时实现的是升序排列
return second.compareTo(tp.second);
}
}
以下为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return new WholeFileRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// TODO Auto-generated method stub
return false;
}
}
以下为WholeFileRecordReader类
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
private FileSplit fileSplit;
private FSDataInputStream fis;
private Text key = null;
private BytesWritable value = null;
private boolean processed = false;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
// fis.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.key;
}