5 Observer编码示例
一般来说,对数据库建立索引,往往需要单独的数据结构来存储索引的数据。在hbase表中,除了使用rowkey索引数据外,还可以另外建立一张索引表,查询时先查询索引表,然后用查询结果查询数据表。下面这个示例演示如何使用Observer协处理器生成HBase表的二级索引:将数据表ob_table中列info:name的值作为索引表index_ob_table的rowkey,将数据表ob_table中列info:score的值作为索引表index_ob_table中列info:score的值建立二级索引,当用户向数据表中插入数据时,索引表将自动插入二级索引,从而为查询业务数据提供了便利。
5.1 代码在项目中新建类PutObserver作为Observer协处理器应用逻辑类,代码如下:
package com.hbase.demo.observer;
import java.io.IOException;
import java.util.List;
import org.apache.Hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
/**
* @author developer
* 说明:hbase协处理器observer的应用逻辑代码
* 功能:在应用了该observer的hbase表中,所有的put操作,都会将每行数据的info:name列值作为rowkey、info:score列值作为value
* 写入另一张二级索引表index_ob_table,可以提高对于特定字段的查询效率
*/
@SuppressWarnings("deprecation")
public class PutObserver extends BaseRegionObserver{
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// 获取二级索引表
HTableInterface table = e.getEnvironment().getTable(TableName.valueOf("index_ob_table"));
// 获取值
List<Cell> cellList1 = put.get(Bytes.toBytes("info"), Bytes.toBytes("name"));
List<Cell> cellList2 = put.get(Bytes.toBytes("info"), Bytes.toBytes("score"));
// 将数据插入二级索引表
for (Cell cell1 : cellList1) {
// 列info:name的值作为二级索引表的rowkey
Put indexPut = new Put(CellUtil.cloneValue(cell1));
for (Cell cell2 : cellList2) {
// 列info:score的值作为二级索引表中列info:score的值
indexPut.add(Bytes.toBytes("info"), Bytes.toBytes("score"), CellUtil.cloneValue(cell2));
}
// 数据插入二级索引表
table.put(indexPut);
}
// 关闭资源
table.close();
}
}
// 将PutObserver类打包后上传到HDFS
$ hadoopfs -put ovserver_put.jar /input
// 启动hbase shell
$hbase shell
// 创建数据表ob_table
> create'ob_table','info'
// 创建二级索引表ob_table
> create'index_ob_table','info'
// 加载协处理器
>disable 'ob_table'
> alter'ob_table',METHOD =>'table_att','coprocessor' =>'hdfs://localhost:9000/input/observer_put.jar|com.hbase.demo.observer.PutObserver|100'
> enable'ob_table'
// 查看数据表ob_table
> describe'ob_table'
5.3 测试// 在eclipse项目中编写一个客户端,向数据表ob_table中插入测试数据
package com.hbase.demo.observer;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
public class Test {
public static void main(String[] args) throws IOException {
// 配置HBse
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", "2222");
// 建立一个数据库的连接
Connection conn = ConnectionFactory.createConnection(conf);
// 获取表
HTable table = (HTable) conn.getTable(TableName.valueOf("ob_table"));
// 插入测试数据
Put put = new Put(Bytes.toBytes("rowkey01"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("carl"));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("score"), Bytes.toBytes("92"));
table.put(put);
// 关闭资源
table.close();
conn.close();
}
}
// 插入数据后,在hbase shell中查看数据表ob_table中的数据
$hbase shell
> scan'ob_table'
//在hbase shell中查看二级索引表index_ob_table中的数据
> scan'index_ob_table'
Ubuntu Server 14.04 下 Hbase数据库安装
HBase 结点之间时间不一致造成regionserver启动失败
CentOS 6.3下HBase伪分布式平台搭建