在项目中新建类SumClient作为调用RPC服务的客户端测试程序,代码如下:
package com.hbase.demo.endpoint;
import java.io.IOException;
import java.util.Map;
import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import com.google.protobuf.ServiceException;
import com.hbase.demo.endpoint.Sum.SumRequest;
import com.hbase.demo.endpoint.Sum.SumResponse;
import com.hbase.demo.endpoint.Sum.SumService;
/**
* @author developer
* 说明:hbase协处理器endpooint的客户端代码
* 功能:从服务端获取对hbase表指定列的数据的求和结果
*/
public class SumClient {
public static void main(String[] args) throws ServiceException, Throwable {
long sum = 0L;
// 配置HBse
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set("hbase.zookeeper.property.clientPort", "2222");
// 建立一个数据库的连接
Connection conn = ConnectionFactory.createConnection(conf);
// 获取表
HTable table = (HTable) conn.getTable(TableName.valueOf("sum_table"));
// 设置请求对象
final SumRequest request = SumRequest.newBuilder().setFamily("info").setColumn("score").build();
// 获得返回值
Map<byte[], Long> result = table.coprocessorService(SumService.class, null, null,
new Batch.Call<SumService, Long>() {
@Override
public Long call(SumService service) throws IOException {
BlockingRpcCallback<SumResponse> rpcCallback = new BlockingRpcCallback<SumResponse>();
service.getSum(null, request, rpcCallback);
SumResponse response = (SumResponse) rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
});
// 将返回值进行迭代相加
for (Long v : result.values()) {
sum += v;
}
// 结果输出
System.out.println("sum: " + sum);
// 关闭资源
table.close();
conn.close();
}
}
4.3 加载Endpoint// 将Sum类和SumEndPoint类打包后上传到HDFS
$ hadoopfs -put endpoint_sum.jar /input
// 修改hbase配置文件,添加配置
$ vimapp/hbase-1.2.0-cdh5.7.1/conf/hbase-site.xml
<property>
<name>hbase.table.sanity.checks</name>
<value>false</value>
</property>
// 重启hbase
$stop-hbase.sh
$start-hbase.sh
// 启动hbase shell
$hbase shell
// 创建表sum_table
> create'sum_table','info'
// 插入测试数据
> put'sum_table','rowkey01','info:score','95'
> put'sum_table','rowkey02','info:score','98'
> put'sum_table','rowkey02','info:age','20'
// 查看数据
> scan'sum_table'
// 加载协处理器
>disable 'sum_table'
> alter'sum_table',METHOD =>'table_att','coprocessor' =>'hdfs://localhost:9000/input/endpoint_sum.jar|com.hbase.demo.endpoint.SumEndPoint|100'
>enable 'sum_table'
// 如果要卸载协处理器,可以先查看表中协处理器名,然后通过命令卸载
>disable 'sum_table'
> describe'sum_table'
> alter'sum_table',METHOD =>'table_att_unset',NAME=>'coprocessor$1'
> enable'sum_table'
4.4 测试在eclipse中运行客户端程序SumClient,输出结果为193,正好符合预期,如下图所示: