在类org.apache.Hadoop.hdfs.server.namenode.NNStorageRetentionManager的purgeOldStorage()方法中描述了fsimage和edit logs的处理逻辑:
一、找到存在于fsimage中的最小txid,删除比最小txid小的fsimage
二、最小txid - dfs.namenode.num.extra.edits.retained = 可以删除txid集合
三、可删除txid集合 > dfs.namenode.max.extra.edits.segments.retained 时,删除集合中的最小值
相关阅读:
Ubuntu 13.04上搭建Hadoop环境
Ubuntu 12.10 +Hadoop 1.2.1版本集群配置
搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建)
完整代码:
public void purgeOldStorage() throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector();
storage.inspectStorageDirs(inspector);
long minImageTxId = getImageTxIdToRetain(inspector);
purgeCheckpointsOlderThan(inspector, minImageTxId);
// If fsimage_N is the image we want to keep, then we need to keep
// all txns > N. We can remove anything < N+1, since fsimage_N
// reflects the state up to and including N. However, we also
// provide a "cushion" of older txns that we keep, which is
// handy for HA, where a remote node may not have as many
// new images.
//
// First, determine the target number of extra transactions to retain based
// on the configured amount.
long minimumRequiredTxId = minImageTxId + 1;
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return ComparisonChain.start()
.compare(a.getFirstTxId(), b.getFirstTxId())
.compare(a.getLastTxId(), b.getLastTxId())
.result();
}
});
// Remove from consideration any edit logs that are in fact required.
while (editLogs.size() > 0 &&
editLogs.get(editLogs.size() - 1).getFirstTxId() >= minimumRequiredTxId) {
editLogs.remove(editLogs.size() - 1);
}
// Next, adjust the number of transactions to retain if doing so would mean
// keeping too many segments around.
while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
purgeLogsFrom = editLogs.get(0).getLastTxId() + 1;
editLogs.remove(0);
}
// Finally, ensure that we're not trying to purge any transactions that we
// actually need.
if (purgeLogsFrom > minimumRequiredTxId) {
throw new AssertionError("Should not purge more edits than required to "
+ "restore: " + purgeLogsFrom + " should be <= "
+ minimumRequiredTxId);
}
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
}