HBase实现的TableOutputFormat将输出的<key,value>对写到指定的HBase表中,该类不会对WAL(Write-Ahead Log)进行操作,即如果服务器发生故障将面临丢失数据的风险。可以使用MultipleTableOutputFormat类解决这个问题,该类可以对是否写入WAL进行设置。
为了能使Hadoop集群上运行HBase程序,还需要把相关的类文件引入Hadoop集群上,不然会出现ClassNotFoundException错误。其具体方法是可在hadoop的环境配置文件hadoop-env.sh中引入HBASE_HOME和HBase的相关jar包,或者直接将HBase的jar包打包到应用程序文件中。
下面这个例子是将MapReduce和HBase结合起来的WordCount程序,它首先从指定文件中搜集数据,进行统计计算,最后将结果存储到HBase中:
package com.hbase.demo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; public class HBaseWordCount { public static class hBaseMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable ONE = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for ( String w : words) { word.set(w); context.write(word, ONE); } } } public static class hBaseReducer extends TableReducer<Text, IntWritable, NullWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } // Put实例化,每个词存一行 Put put = new Put(key.getBytes()); // 列族为content,列名为count,列值为单词的数目 put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes()); context.write(NullWritable.get(), put); } } // 创建HBase数据表 public static void createHBaseTable(String tableName) throws IOException { // 配置HBse Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // 建立一个数据库的连接 Connection conn = ConnectionFactory.createConnection(conf); // 创建一个数据库管理员 HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin(); // 判断表是否存在 if (hAdmin.tableExists(tableName)) { System.out.println("该数据表已存在,正在重新创建"); hAdmin.disableTable(tableName); hAdmin.deleteTable(tableName); } // 创建表描述 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName)); // 在表描述里添加列族 tableDesc.addFamily(new HColumnDescriptor("content")); // 创建表 hAdmin.createTable(tableDesc); System.out.println("创建" + tableName + "表成功"); } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 3) { System.out.println("args error"); System.exit(0); } String input = args[0]; String jobName = args[1]; String tableName = args[2]; // 创建数据表 HBaseWordCount.createHBaseTable(tableName); // 配置MapReduce(或者将hadoop和hbase的相关配置文件引入项目) Configuration conf = new Configuration(); conf.set("fs.defaultFS", "localhost:9000"); conf.set("mapred.job.tracker", "localhost:9001"); conf.set("hbase.zookeeper.quorum", "localhost"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set(TableOutputFormat.OUTPUT_TABLE, tableName); // 配置任务 Job job = Job.getInstance(conf, jobName); job.setJarByClass(HBaseWordCount.class); job.setMapperClass(hBaseMapper.class); job.setReducerClass(hBaseReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.addInputPath(job, new Path(input)); //执行MR任务 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }