如何通过Java程序提交yarn的MapReduce计算任务(2)

@Override
 public boolean equals(Object o) {
  if (o instanceof TextPair) {
   TextPair tp = (TextPair) o;
   return first.equals(tp.first) && second.equals(tp.second);
  }
  return false;
 }

@Override
 public String toString() {
  return first + "\t" + second;
 }
 
 @Override
 /**A.compareTo(B)
  * 如果比较相同,则比较结果为0
  * 如果A大于B,则比较结果为1
  * 如果A小于B,则比较结果为-1
  *
  */
 public int compareTo(TextPair tp) {
  int cmp = first.compareTo(tp.first);
  if (cmp != 0) {
   return cmp;
  }
  //此时实现的是升序排列
  return second.compareTo(tp.second);
 }
}

以下为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分

package web.hadoop;

import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> { 
 
    @Override 
    public RecordReader<Text, BytesWritable> createRecordReader( 
            InputSplit arg0, TaskAttemptContext arg1) throws IOException, 
            InterruptedException { 
        // TODO Auto-generated method stub 
        return new WholeFileRecordReader(); 
    } 
 
    @Override 
    protected boolean isSplitable(JobContext context, Path filename) { 
        // TODO Auto-generated method stub 
        return false; 
    } 

以下为WholeFileRecordReader类

package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

private FileSplit fileSplit;
 private FSDataInputStream fis;

private Text key = null;
 private BytesWritable value = null;

private boolean processed = false;

@Override
 public void close() throws IOException {
  // TODO Auto-generated method stub
  // fis.close();
 }

@Override
 public Text getCurrentKey() throws IOException, InterruptedException {
  // TODO Auto-generated method stub
  return this.key;
 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/28fd843d9701273a5243d064eda41816.html