Hadoop : 新版API 自定义InputFormat 把整个文件作为一

自定义InputFormat 新版API 把真个文件当成一条输入 
 
主要参考 源代码LineRecordReader里面的内容  有些细节还没有理解 

WholeFileInputFormat    import java.io.IOException;    import org.apache.Hadoop.fs.Path;  import org.apache.hadoop.io.BytesWritable;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.InputSplit;  import org.apache.hadoop.mapreduce.JobContext;  import org.apache.hadoop.mapreduce.RecordReader;  import org.apache.hadoop.mapreduce.TaskAttemptContext;  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;      public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {        @Override      public RecordReader<Text, BytesWritable> createRecordReader(              InputSplit arg0, TaskAttemptContext arg1) throws IOException,              InterruptedException {          // TODO Auto-generated method stub          return new WholeFileRecordReader();      }        @Override      protected boolean isSplitable(JobContext context, Path filename) {          // TODO Auto-generated method stub          return false;      }    }    WholeFileRecordReader    import java.io.IOException;    import org.apache.hadoop.conf.Configuration;  import org.apache.hadoop.fs.FSDataInputStream;  import org.apache.hadoop.fs.FSDataOutputStream;  import org.apache.hadoop.fs.FileSystem;  import org.apache.hadoop.fs.Path;  import org.apache.hadoop.io.BytesWritable;  import org.apache.hadoop.io.IOUtils;  import org.apache.hadoop.io.Text;  import org.apache.hadoop.mapreduce.InputSplit;  import org.apache.hadoop.mapreduce.RecordReader;  import org.apache.hadoop.mapreduce.TaskAttemptContext;  import org.apache.hadoop.mapreduce.lib.input.FileSplit;  import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;      public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {        private FileSplit fileSplit;      private FSDataInputStream fis;            private Text key = null;      private BytesWritable value = null;        private boolean processed = false;              @Override      public void close() throws IOException {          // TODO Auto-generated method stub          //fis.close();      }        @Override      public Text getCurrentKey() throws IOException, InterruptedException {          // TODO Auto-generated method stub          return this.key;      }        @Override      public BytesWritable getCurrentValue() throws IOException,              InterruptedException {          // TODO Auto-generated method stub          return this.value;      }                @Override      public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)              throws IOException, InterruptedException {                    fileSplit = (FileSplit) inputSplit;          Configuration job = tacontext.getConfiguration();          Path file = fileSplit.getPath();          FileSystem fs = file.getFileSystem(job);          fis = fs.open(file);      }        @Override      public boolean nextKeyValue() {                    if(key == null){              key = new Text();          }                    if(value == null){              value = new BytesWritable();          }                    if(!processed){              byte[] content = new byte[(int) fileSplit.getLength()];                            Path file = fileSplit.getPath();                            System.out.println(file.getName());              key.set(file.getName());                            try {                  IOUtils.readFully(fis, content, 0, content.length);                  //value.set(content, 0, content.length);                  value.set(new BytesWritable(content));              } catch (IOException e) {                  // TODO Auto-generated catch block                  e.printStackTrace();              } finally{                  IOUtils.closeStream(fis);              }                                            processed = true;              return true;          }                    return false;      }        @Override      public float getProgress() throws IOException, InterruptedException {          // TODO Auto-generated method stub          return processed? fileSplit.getLength():0;      }            }    验证  public static class mapper extends Mapper<Text, BytesWritable, Text, Text>{            @Override          protected void map(Text key, BytesWritable value, Context context)                  throws IOException, InterruptedException {              // TODO Auto-generated method stub                context.write(key, new Text(value.getBytes()));          }      }    note:value是BytesWritable类型,显示为十六进制数, new Text(value.getBytes()) 变成字符串形式  自定义InputFormat 新版API 把真个文件当成一条输入

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/c5aa2624201f48c7adc924ad663e1d29.html