基于 Hive 的文件格式:RCFile 简介及其应用(3)

5、如何生成 RCFile 文件

上面说了这么多,想必你已经知道 RCFile 主要用于提升 hive 的查询效率,那如何生成这种格式的文件呢?

(1)hive 中直接通过textfile表进行insert转换

例如:

insert overwrite table http_RCTable partition(dt='2013-09-30') select p_id,tm,idate,phone from tmp_testp where dt='2013-09-30';

(2)通过 mapreduce 生成

目前为止,mapreduce 并没有提供内置 API 对 RCFile 进行支持,倒是 pig、hive、hcatalog 等 Hadoop生态圈里的其他项目进行了支持,究其原因是因为 RCFile 相比 textfile 等其它文件格式,对于 mapreduce 的应用场景来说没有显著的优势。

为了避免重复造轮子,下面的生成 RCFile 的 mapreduce 代码调用了 hive 和 hcatalog 的相关类,注意你在测试下面的代码时,你的 hadoop、hive、hcatalog 版本要一致,否则。。。你懂的。。。


比如我用的 hive-0.10.0+198-1.cdh4.4.0,那么就应该下载对应的版本:

PS:下面的代码已经测试通过,木有问题。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat;
import org.apache.hcatalog.rcfile.RCFileMapReduceOutputFormat;

public class TextToRCFile extends Configured implements Tool{


 
 public static class Map
     extends Mapper<Object, Text, NullWritable, BytesRefArrayWritable>{
 
  private byte[] fieldData;
  private int numCols;
  private BytesRefArrayWritable bytes;
 
  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
   numCols = context.getConfiguration().getInt("hive.io.rcfile.column.number.conf", 0);
   bytes = new BytesRefArrayWritable(numCols);
  }
 
  public void map(Object key, Text line, Context context
                ) throws IOException, InterruptedException {
   bytes.clear();
   String[] cols = line.toString().split("\\|");
   System.out.println("SIZE : "+cols.length);
   for (int i=0; i<numCols; i++){
         fieldData = cols[i].getBytes("UTF-8");
         BytesRefWritable cu = null;
            cu = new BytesRefWritable(fieldData, 0, fieldData.length);
            bytes.set(i, cu);
        }
   context.write(NullWritable.get(), bytes);
  }
 }
 
 @Override
 public int run(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if(otherArgs.length < 2){
     System.out.println("Usage: " +
       "hadoop jar RCFileLoader.jar <main class> " +
       "-tableName <tableName> -numCols <numberOfColumns> -input <input path> " +
       "-output <output path> -rowGroupSize <rowGroupSize> -ioBufferSize <ioBufferSize>");
     System.out.println("For test");
     System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " +
       "-tableName test1 -numCols 10 -input RCFileLoaderTest/test1 " +
       "-output RCFileLoaderTest/RCFile_test1");
     System.out.println("$HADOOP jar RCFileLoader.jar edu.osu.cse.rsam.rcfile.mapreduce.LoadTable " +
       "-tableName test2 -numCols 5 -input RCFileLoaderTest/test2 " +
       "-output RCFileLoaderTest/RCFile_test2");
     return 2;
    }
 
  /* For test
   
  */
 
   
  String tableName = "";
  int numCols = 0;
  String inputPath = "";
  String outputPath = "";
  int rowGroupSize = 16 *1024*1024;
  int ioBufferSize = 128*1024;
    for (int i=0; i<otherArgs.length - 1; i++){
     if("-tableName".equals(otherArgs[i])){
      tableName = otherArgs[i+1];
     }else if ("-numCols".equals(otherArgs[i])){
      numCols = Integer.parseInt(otherArgs[i+1]);
     }else if ("-input".equals(otherArgs[i])){
      inputPath = otherArgs[i+1];
     }else if("-output".equals(otherArgs[i])){
      outputPath = otherArgs[i+1];
     }else if("-rowGroupSize".equals(otherArgs[i])){
      rowGroupSize = Integer.parseInt(otherArgs[i+1]);
     }else if("-ioBufferSize".equals(otherArgs[i])){
      ioBufferSize = Integer.parseInt(otherArgs[i+1]);
     }
     
    }
   
    conf.setInt("hive.io.rcfile.record.buffer.size", rowGroupSize);
    conf.setInt("io.file.buffer.size", ioBufferSize);
   
    Job job = new Job(conf, "RCFile loader: loading table " + tableName + " with " + numCols + " columns");
   
    job.setJarByClass(TextToRCFile.class);
    job.setMapperClass(Map.class);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(BytesRefArrayWritable.class);
//    job.setNumReduceTasks(0);
   
    FileInputFormat.addInputPath(job, new Path(inputPath));
   
    job.setOutputFormatClass(RCFileMapReduceOutputFormat.class);
    RCFileMapReduceOutputFormat.setColumnNumber(job.getConfiguration(), numCols);
    RCFileMapReduceOutputFormat.setOutputPath(job, new Path(outputPath));
    RCFileMapReduceOutputFormat.setCompressOutput(job, false);
   
   
    System.out.println("Loading table " + tableName + " from " + inputPath + " to RCFile located at " + outputPath);
    System.out.println("number of columns:" + job.getConfiguration().get("hive.io.rcfile.column.number.conf"));
    System.out.println("RCFile row group size:" + job.getConfiguration().get("hive.io.rcfile.record.buffer.size"));
    System.out.println("io bufer size:" + job.getConfiguration().get("io.file.buffer.size"));
   
    return (job.waitForCompletion(true) ? 0 : 1);
 }
 
 public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new TextToRCFile(), args);
    System.exit(res);
 }

}

Hive 的详细介绍请点这里
Hive 的下载地址请点这里

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/4186866cc3e2c7d3b6597858e221d617.html