HBase的协处理器编码实战(2)

在项目中新建类SumClient作为调用RPC服务的客户端测试程序,代码如下:

package com.hbase.demo.endpoint;

import java.io.IOException;
import java.util.Map;

import org.apache.Hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;

import com.google.protobuf.ServiceException;
import com.hbase.demo.endpoint.Sum.SumRequest;
import com.hbase.demo.endpoint.Sum.SumResponse;
import com.hbase.demo.endpoint.Sum.SumService;


/**
 * @author developer
 * 说明:hbase协处理器endpooint的客户端代码
 * 功能:从服务端获取对hbase表指定列的数据的求和结果
 */
public class SumClient {

public static void main(String[] args) throws ServiceException, Throwable {
       
        long sum = 0L;
       
        // 配置HBse
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost");
        conf.set("hbase.zookeeper.property.clientPort", "2222");
        // 建立一个数据库的连接
        Connection conn = ConnectionFactory.createConnection(conf);
        // 获取表
        HTable table = (HTable) conn.getTable(TableName.valueOf("sum_table"));
        // 设置请求对象
        final SumRequest request = SumRequest.newBuilder().setFamily("info").setColumn("score").build();
        // 获得返回值
        Map<byte[], Long> result = table.coprocessorService(SumService.class, null, null,
                new Batch.Call<SumService, Long>() {

@Override
                    public Long call(SumService service) throws IOException {
                        BlockingRpcCallback<SumResponse> rpcCallback = new BlockingRpcCallback<SumResponse>();
                        service.getSum(null, request, rpcCallback);
                        SumResponse response = (SumResponse) rpcCallback.get();
                        return response.hasSum() ? response.getSum() : 0L;
                    }
        });
        // 将返回值进行迭代相加
        for (Long v : result.values()) {
            sum += v;
        }
        // 结果输出
        System.out.println("sum: " + sum);
        // 关闭资源
        table.close();
        conn.close();
    }

}

4.3 加载Endpoint

// 将Sum类和SumEndPoint类打包后上传到HDFS

$ hadoopfs -put endpoint_sum.jar /input

// 修改hbase配置文件,添加配置

$ vimapp/hbase-1.2.0-cdh5.7.1/conf/hbase-site.xml

<property>

  <name>hbase.table.sanity.checks</name>

    <value>false</value>

  </property>

// 重启hbase

$stop-hbase.sh

$start-hbase.sh

// 启动hbase shell

$hbase shell

// 创建表sum_table

> create'sum_table','info'

// 插入测试数据

> put'sum_table','rowkey01','info:score','95'

> put'sum_table','rowkey02','info:score','98'

> put'sum_table','rowkey02','info:age','20'

// 查看数据

> scan'sum_table'

HBase的协处理器编码实战

// 加载协处理器

>disable 'sum_table'

> alter'sum_table',METHOD =>'table_att','coprocessor' =>'hdfs://localhost:9000/input/endpoint_sum.jar|com.hbase.demo.endpoint.SumEndPoint|100'

>enable 'sum_table'

HBase的协处理器编码实战

// 如果要卸载协处理器,可以先查看表中协处理器名,然后通过命令卸载

>disable 'sum_table'

> describe'sum_table'

> alter'sum_table',METHOD =>'table_att_unset',NAME=>'coprocessor$1'

> enable'sum_table'

HBase的协处理器编码实战

4.4 测试

在eclipse中运行客户端程序SumClient,输出结果为193,正好符合预期,如下图所示:

HBase的协处理器编码实战

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/14591.html