HBase多线程建立HTable问题(2)

wormhole的reader和writer会分别起一个ThreadPoolExecutor,出错是在writer端的flush阶段,也就是最后一次批量插入操作。由于我的reader是每一个thread一个htable instance没有问题,而writer是共用了一个singleton HBaseClient,然后用ThreadLocal去保证每一个thread拥有一个本地htable对象,有可能有错误,最简单的方法是把writer端不用singleton HBaseClient,问题应该解决,不过没搞清root cause,不爽啊。。。

后来看了HTable和HAdmin的源代码才有点线索

public HTable(Configuration conf, final byte [] tableName)
  throws IOException {
    this.tableName = tableName;
    this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
    if (conf == null) {
      this.connection = null;
      return;
    }
    this.connection = HConnectionManager.getConnection(conf);
    this.configuration = conf;
    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
    if (maxThreads == 0) {
      maxThreads = 1; // is there a better default?
    }
    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
    ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
    this.finishSetup();
  }

每一个HTable instance都有一个HConnection对象,它负责与Zookeeper和之后的HBase Cluster建立链接(比如cluster中定位region,locations的cache,当region移动后重新校准),它由HConnectionManager来管理

public static HConnection getConnection(Configuration conf)
  throws ZooKeeperConnectionException {
    HConnectionKey connectionKey = new HConnectionKey(conf);
    synchronized (HBASE_INSTANCES) {
      HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);
      if (connection == null) {
        connection = new HConnectionImplementation(conf, true);
        HBASE_INSTANCES.put(connectionKey, connection);
      }
      connection.incCount();
      return connection;
    }
  }

HConnectionManager内部有LRU MAP => HBASE_INSTANCES的静态变量作为cache,key为HConnectionKey,包含了username和指定的properties(由传进去的conf提取), value就是HConnection具体实现HConnectionImplementation,由于传入进去的conf都一样,所以都指向同一个HConnectionImplementation,最后会调用connection.incCount()将client reference count加1

public void close() throws IOException {
if (this.closed) {
return;
}
flushCommits();
if (cleanupPoolOnClose) {
this.pool.shutdown();
}
if (cleanupConnectionOnClose) {
if (this.connection != null) {
this.connection.close();
}
}
this.closed = true;
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/a4ab11e55563448767fc3139c5c71771.html