import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Created by shaobo on 15-6-9.
*/
public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private String hbaseTable;
private String dataSeperator;
private String columnFamily1;
private String columnFamily2;
public void setup(Context context) {
Configuration configuration = context.getConfiguration();//获取作业参数
hbaseTable = configuration.get("hbase.table.name");
dataSeperator = configuration.get("data.seperator");
columnFamily1 = configuration.get("COLUMN_FAMILY_1");
columnFamily2 = configuration.get("COLUMN_FAMILY_2");
}
public void map(LongWritable key, Text value, Context context){
try {
String[] values = value.toString().split(dataSeperator);
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(values[0].getBytes());
Put put = new Put(Bytes.toBytes(values[0]));
put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("month"), Bytes.toBytes(values[1]));
put.addColumn(Bytes.toBytes(columnFamily1), Bytes.toBytes("day"), Bytes.toBytes(values[2]));
for (int i = 3; i < values.length; ++i){
put.addColumn(Bytes.toBytes(columnFamily2), Bytes.toBytes("hour : " + i), Bytes.toBytes(values[i]));
}
context.write(rowKey, put);
} catch(Exception exception) {
exception.printStackTrace();
}
}
}
HFileLoader.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
/**
* Created by shaobo on 15-6-9.
*/
public class HFileLoader {
public static void doBulkLoad(String pathToHFile, String tableName){
try {
Configuration configuration = new Configuration();
HBaseConfiguration.addHbaseResources(configuration);
LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);
HTable hTable = new HTable(configuration, tableName);//指定表名
loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);//导入数据
System.out.println("Bulk Load Completed..");
} catch(Exception exception) {
exception.printStackTrace();
}
}
}
程序编译打包,提交到Hadoop运行
HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar BulkLoad.jar inputpath outputpath1
上述命令用法可参考
作业运行情况: