自定义InputFormat 新版API 把真个文件当成一条输入
主要参考 源代码LineRecordReader里面的内容 有些细节还没有理解
WholeFileInputFormat import java.io.IOException; import org.apache.Hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { @Override public RecordReader<Text, BytesWritable> createRecordReader( InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new WholeFileRecordReader(); } @Override protected boolean isSplitable(JobContext context, Path filename) { // TODO Auto-generated method stub return false; } } WholeFileRecordReader import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> { private FileSplit fileSplit; private FSDataInputStream fis; private Text key = null; private BytesWritable value = null; private boolean processed = false; @Override public void close() throws IOException { // TODO Auto-generated method stub //fis.close(); } @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.key; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return this.value; } @Override public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext) throws IOException, InterruptedException { fileSplit = (FileSplit) inputSplit; Configuration job = tacontext.getConfiguration(); Path file = fileSplit.getPath(); FileSystem fs = file.getFileSystem(job); fis = fs.open(file); } @Override public boolean nextKeyValue() { if(key == null){ key = new Text(); } if(value == null){ value = new BytesWritable(); } if(!processed){ byte[] content = new byte[(int) fileSplit.getLength()]; Path file = fileSplit.getPath(); System.out.println(file.getName()); key.set(file.getName()); try { IOUtils.readFully(fis, content, 0, content.length); //value.set(content, 0, content.length); value.set(new BytesWritable(content)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ IOUtils.closeStream(fis); } processed = true; return true; } return false; } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return processed? fileSplit.getLength():0; } } 验证 public static class mapper extends Mapper<Text, BytesWritable, Text, Text>{ @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(key, new Text(value.getBytes())); } } note:value是BytesWritable类型,显示为十六进制数, new Text(value.getBytes()) 变成字符串形式 自定义InputFormat 新版API 把真个文件当成一条输入