如何通过Java程序提交yarn的MapReduce计算任务

由于项目需求,需要通过Java程序提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码。

以下为MapReduce主程序,有几点需要提一下:

1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。

2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。

3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用。

package web.Hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import util.Utils;

public class GEMIMain {
 
 public GEMIMain(){
  job = null;
 }
 
 public Job job;
 public static class NamePartitioner extends
   Partitioner<TextPair, BytesWritable> {
  @Override
  public int getPartition(TextPair key, BytesWritable value,
    int numPartitions) {
   return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
  }
 }

/**
  * 分组设置类,只要两个TextPair的第一个key相同,他们就属于同一组。他们的Value就放到一个Value迭代器中,
  * 然后进入Reducer的reduce方法中。
  *
  * @author hduser
  *
  */
 public static class GroupComparator extends WritableComparator {
  public GroupComparator() {
   super(TextPair.class, true);
  }

@Override
  public int compare(WritableComparable a, WritableComparable b) {
   TextPair t1 = (TextPair) a;
   TextPair t2 = (TextPair) b;
   // 比较相同则返回0,比较不同则返回-1
   return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一个字段相同的就分成为同一组
  }
 }
 
 
 public  boolean runJob(String[] args) throws IOException,
   ClassNotFoundException, InterruptedException {
 
  Configuration conf = new Configuration();
  // 在conf中设置outputath变量,以在reduce函数中可以获取到该参数的值
  conf.set("outputPath", args[args.length - 1].toString());
  //设置HDFS中,每次任务生成产品的质量文件所在文件夹。args数组的倒数第二个原数为质量文件所在文件夹
  conf.set("qualityFolder", args[args.length - 2].toString());
  //如果在Server中运行,则需要获取web项目的根路径;如果以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件
  //MapReduceProgress mprogress = new MapReduceProgress();
  //String rootPath= mprogress.rootPath;
  String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
  conf.addResource(new Path(rootPath+"yarn-site.xml"));
  conf.addResource(new Path(rootPath+"core-site.xml"));
  conf.addResource(new Path(rootPath+"hdfs-site.xml"));
  conf.addResource(new Path(rootPath+"mapred-site.xml"));
  this.job = new Job(conf);
 
  job.setJobName("Job name:" + args[0]);
  job.setJarByClass(GEMIMain.class);

job.setMapperClass(GEMIMapper.class);
  job.setMapOutputKeyClass(TextPair.class);
  job.setMapOutputValueClass(BytesWritable.class);
  // 设置partition
  job.setPartitionerClass(NamePartitioner.class);
  // 在分区之后按照指定的条件分组
  job.setGroupingComparatorClass(GroupComparator.class);

job.setReducerClass(GEMIReducer.class);

job.setInputFormatClass(WholeFileInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  // job.setOutputKeyClass(NullWritable.class);
  // job.setOutputValueClass(Text.class);
  job.setNumReduceTasks(8);
 
 
  // 设置计算输入数据的路径
  for (int i = 1; i < args.length - 2; i++) {
   FileInputFormat.addInputPath(job, new Path(args[i]));
  }
  // args数组的最后一个元素为输出路径
  FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
  boolean flag = job.waitForCompletion(true);
  return flag;
 }
 
 @SuppressWarnings("static-access")
 public static void main(String[] args) throws ClassNotFoundException,
   IOException, InterruptedException { 
 
  String[] inputPaths = new String[] { "normalizeJob",
    "hdfs://192.168.168.101:9000/user/hduser/red1/",
    "hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
    "hdfs://192.168.168.101:9000/user/hduser/test" };
  GEMIMain test = new GEMIMain();
  boolean result = test.runJob(inputPaths);       
 }
}

以下为TextPair类

public class TextPair implements WritableComparable<TextPair> {
 private Text first;
 private Text second;

public TextPair() {
  set(new Text(), new Text());
 }

public TextPair(String first, String second) {
  set(new Text(first), new Text(second));
 }

public TextPair(Text first, Text second) {
  set(first, second);
 }

public void set(Text first, Text second) {
  this.first = first;
  this.second = second;
 }

public Text getFirst() {
  return first;
 }

public Text getSecond() {
  return second;
 }

@Override
 public void write(DataOutput out) throws IOException {
  first.write(out);
  second.write(out);
 }

@Override
 public void readFields(DataInput in) throws IOException {
  first.readFields(in);
  second.readFields(in);
 }

@Override
 public int hashCode() {
  return first.hashCode() * 163 + second.hashCode();
 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/28fd843d9701273a5243d064eda41816.html