客户端读写数据是先从HBase Master获取RegionServer的元数据信息,比如Region地址信息。在执行数据写操作时,HBase会先写MetaStore,为什么会写到MetaStore。本篇博客将为读者剖析HBase MetaStore和Compaction的详细内容。
2.内容HBase的内部通信和数据交互是通过RPC来实现,关于HBase的RPC实现机制下篇博客为大家分享。客户端应用程序通过RPC调用HBase服务端的写入、删除、读取等请求,由HBase的Master分配对应的RegionServer进行处理,获取每个RegionServer中的Region地址,写入到HFile文件中,最终进行数据持久化。
在了解HBase MetaStore之前,我们可以先来看看RegionServer的体系结构,其结构图如下所示:
在HBase存储中,虽然Region是分布式存储的最小单元,单并不是存储的最小单元。从图中可知,事实上Region是由一个或者多个Store构成的,每个Store保存一个列族(Columns Family)。而每个Store又由一个MemStore和0到多个StoreFile构成,而StoreFile以HFile的格式最终保存在HDFS上。
2.1 写入流程HBase为了保证数据的随机读取性能,在HFile中存储RowKey时,按照顺序存储,即有序性。在客户端的请求到达RegionServer后,HBase为了保证RowKey的有序性,不会将数据立即写入到HFile中,而是将每个执行动作的数据保存在内存中,即MetaStore中。MetaStore能够很方便的兼容操作的随机写入,并且保证所有存储在内存中的数据是有序的。当MetaStore到达阀值时,HBase会触发Flush机制,将MetaStore中的数据Flush到HFile中,这样便能充分利用HDFS写入大文件的性能优势,提供数据的写入性能。
整个读写流程,如下所示:
由于MetaStore是存储放在内存中的,如果RegionServer由于出现故障或者进程宕掉,会导致内存中的数据丢失。HBase为了保证数据的完整性,这存储设计中添加了一个WAL机制。每当HBase有更新操作写数据到MetaStore之前,会写入到WAL中(Write AHead Log的简称)。WAL文件会通过追加和顺序写入,WAL的每个RegionServer只有一个,同一个RegionServer上的所有Region写入到同一个WAL文件中。这样即使某一个RegionServer宕掉,也可以通过WAL文件,将所有数据按照顺序重新加载到内容中。
2.2 读取流程HBase查询通过RowKey来获取数据,客户端应用程序根据对应的RowKey来获取其对应的Region地址。查找Region的地址信息是通过HBase的元数据表来获取的,即hbase:meta表所在的Region。通过读取hbase:meta表可以找到每个Region的StartKey、EndKey以及所属的RegionServer。由于HBase的RowKey是有序分布在Region上,所以通过每个Region的StartKey和EndKey来确定当前操作的RowKey的Region地址。
由于扫描hbase:meta表会比较耗时,所以客户端会存储表的Region地址信息。当请求的Region租约过期时,会重新加载表的Region地址信息。
2.3 Flush机制RegionServer将数据写入到HFile中不是同步发生的,是需要在MetaStore的内存到达阀值时才会触发。RegionServer中所有的Region的MetaStore的内存占用量达到总内存的设置占用量之后,才会将MetaStore中的所有数据写入到HFile中。同时会记录以及写入的数据的顺序ID,便于WAL的日志清理机制定时删除WAL的无用日志。
MetaStore大小到达阀值后会Flush到磁盘中,关键参数由hbase.hregion.memstore.flush.size属性配置,默认是128MB。在Flush的时候,不会立即去Flush到磁盘,会有一个检测的过程。通过MemStoreFlusher类来实现,具体实现代码如下所示:
private boolean flushRegion(final FlushRegionEntry fqe) { HRegion region = fqe.region; if (!region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(region)) { if (fqe.isMaximumWait(this.blockingWaitTime)) { LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + "ms on a compaction to clean up 'too many store files'; waited " + "long enough... proceeding with flush of " + region.getRegionNameAsString()); } else { // If this is first time we've been put off, then emit a log message. if (fqe.getRequeueCount() <= 0) { // Note: We don't impose blockingStoreFiles constraint on meta regions LOG.warn("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { this.server.compactSplitThread.requestSystemCompaction( region, Thread.currentThread().getName()); } catch (IOException e) { LOG.error( "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), RemoteExceptionHandler.checkIOException(e)); } } } // Put back on the queue. Have it come back out of the queue // after a delay of this.blockingWaitTime / 100 ms. this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100)); // Tell a lie, it's not flushed but it's ok return true; } } return flushRegion(region, false, fqe.isForceFlushAllStores()); }