An OpenTSDB compaction takes multiple columns in an HBase row and merges them into a single column to reduce disk space. This is not to be confused with HBase compactions where multiple edits to a region are merged into one. OpenTSDB compactions can occur periodically for a TSD after data has been written, or during a query.
tsd.storage.enable_compaction:是否开启压缩(默认为true,开启压缩)
为了减少存储空间(讲道理对查询也有好处),opentsdb在写入时序数据的同时会把rowkey放到ConcurrentSkipListMap中,一个daemon线程不断检查System.currentTimeMillis()/1000-3600-1之前的数据能否被压缩,满足压缩条件则会把一小时内的时序数据(它们的rowkey是相同的)查出来在内存压缩(compact)成一列回写(write)到HBase中,然后delete之前的原始数据。或者是查询(query)操作可能也会触发compaction操作。代码见CompactionQueue
final class CompactionQueue extends ConcurrentSkipListMap<byte[], Boolean> { public CompactionQueue(final TSDB tsdb) { super(new Cmp(tsdb)); // tsd.storage.enable_appends if (tsdb.config.enable_compactions()) { // 启用了压缩则会启一个daemon的线程 startCompactionThread(); } } /** * Helper to sort the byte arrays in the compaction queue. * <p> * This comparator sorts things by timestamp first, this way we can find * all rows of the same age at once. */ private static final class Cmp implements Comparator<byte[]> { /** The position with which the timestamp of metric starts. */ private final short timestamp_pos; public Cmp(final TSDB tsdb) { timestamp_pos = (short) (Const.SALT_WIDTH() + tsdb.metrics.width()); } @Override public int compare(final byte[] a, final byte[] b) { // 取rowkey中的base_time进行排序 final int c = Bytes.memcmp(a, b, timestamp_pos, Const.TIMESTAMP_BYTES); // If the timestamps are equal, sort according to the entire row key. return c != 0 ? c : Bytes.memcmp(a, b); } } }看看上面启动的daemon线程在做啥CompactionQueue#Thrd
/** * Background thread to trigger periodic compactions. */ final class Thrd extends Thread { public Thrd() { super("CompactionThread"); } @Override public void run() { while (true) { final int size = size(); // 达到最小压缩阈值则触发flush() if (size > min_flush_threshold) { final int maxflushes = Math.max(min_flush_threshold, size * flush_interval * flush_speed / Const.MAX_TIMESPAN); final long now = System.currentTimeMillis(); // 检查上个整点的数据能否被压缩 flush(now / 1000 - Const.MAX_TIMESPAN - 1, maxflushes); } } } }再看CompactionQueue#flush(final long cut_off, int maxflushes)
private Deferred<ArrayList<Object>> flush(final long cut_off, int maxflushes) { final ArrayList<Deferred<Object>> ds = new ArrayList<Deferred<Object>>(Math.min(maxflushes, max_concurrent_flushes)); int nflushes = 0; int seed = (int) (System.nanoTime() % 3); for (final byte[] row : this.keySet()) { final long base_time = Bytes.getUnsignedInt(row, Const.SALT_WIDTH() + metric_width); if (base_time > cut_off) { // base_time比较靠近当前时间,则直接跳出 break; } else if (nflushes == max_concurrent_flushes) { break; } // 这里会发向hbase发get请求获取时序数据,在callback中进行压缩操作 ds.add(tsdb.get(row).addCallbacks(compactcb, handle_read_error)); } return group; }最后看一下compaction具体做了啥,见CompactionQueue#Compaction#compact()
public Deferred<Object> compact() { // merge the datapoints, ordered by timestamp and removing duplicates final ByteBufferList compacted_qual = new ByteBufferList(tot_values); final ByteBufferList compacted_val = new ByteBufferList(tot_values); mergeDatapoints(compacted_qual, compacted_val); // build the compacted columns final KeyValue compact = buildCompactedColumn(compacted_qual, compacted_val); final boolean write = updateDeletesCheckForWrite(compact); final byte[] key = compact.key(); deleted_cells.addAndGet(to_delete.size()); // We're going to delete this. if (write) { // 把压缩后的结果回写到tsdb表 Deferred<Object> deferred = tsdb.put(key, compact.qualifier(), compact.value(), compactedKVTimestamp); if (!to_delete.isEmpty()) { // 压缩结果写入成功后 delete查询出来的cells deferred = deferred.addCallbacks(new DeleteCompactedCB(to_delete), handle_write_error); } return deferred; } } // delete compacted cells的回调 private final class DeleteCompactedCB implements Callback<Object, Object> { /** What we're going to delete. */ private final byte[] key; private final byte[][] qualifiers; @Override public Object call(final Object arg) { return tsdb.delete(key, qualifiers).addErrback(handle_delete_error); } @Override public String toString() { return "delete compacted cells"; } }