Hbase自0.92之后开始支持Coprocessor(协处理器),旨在使用户可以将自己的代码放在regionserver上来运行,即将计算程序移动到数据所在的位置进行运算。这一点与MapReduce的思想一致。Hbase的Coprocess分为observer和endpoint两大类。简单说,observer相当于关系型数据库中的触发器,而endpoint则相当于关系型数据库中的存储过程。关于HBase Coprocessor的介绍网上有很多的文档,由于我也是刚刚学习,从很多好人贡献的文档上了解了很多。
这里记录一下自己在一个完全分布式系统上部署自定义的Coprocessor的过程,本文会介绍两种部署的方法:一种是在hbase-site.xml中配置;第二种是使用表描述符来配置(alter);前者会被所有的表的所有的region加载,而后者只会对指定的表的所有region加载。本文会结合自己的实验过程指出哪些地方为易错点。
Hadoop+HBase搭建云存储总结 PDF
HBase 结点之间时间不一致造成regionserver启动失败
首先,还是先来看下环境:
hadoop1.updb.com 192.168.0.101 Role:master
hadoop2.updb.com 192.168.0.102 Role:regionserver
hadoop3.updb.com 192.168.0.103 Role:regionserver
hadoop4.updb.com 192.168.0.104 Role:regionserver
hadoop5.updb.com 192.168.0.105 Role:regionserver
首先编码自定义的Coprocessor,该段代码摘自《Hbase权威指南》 PDF下载见 ,只是修改了package的名字:
/**
* coprocessor
* 当用户在使用get命令从表中取特定的row时,就会触发这个自定义的observer coprocessor
* 触发条件是用户使用get指定的rowkey与程序中指定的FIXED_ROW一致为@@@GETTIME@@@时
* 触发后的操作是程序会在服务端生成一个keyvalue实例,并将这个实例返回给客户端。这个kv实例是以
* @@@GETTIME@@@为rowkey,列族和列标识符均为@@@GETTIME@@@,列值为服务器端的时间
*/
package org.apache.hbase.kora.coprocessor;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
public class RegionObserverExample extends BaseRegionObserver {
public static final Log LOG = LogFactory.getLog(HRegion.class);
public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@");
@Override
public void preGet(ObserverContext<RegionCoprocessorEnvironment> c,
Get get, List<KeyValue> result) throws IOException {
LOG.debug("Got preGet for row: " + Bytes.toStringBinary(get.getRow()));
if (Bytes.equals(get.getRow(), FIXED_ROW)) {
KeyValue kv = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW,
Bytes.toBytes(System.currentTimeMillis()));
LOG.debug("Had a match, adding fake kv: " + kv);
result.add(kv);
}
}
}
编码完成后需要将该类编译并打成jar包,类名上右击--Export,弹出如下窗口
选择JAR file,然后Next,出现如下窗口