addBlock()负责分配一个新的block以及该block备份存储的datanode。addBlock函数声明如下:
public LocatedBlock addBlock(String src, String clientName,DatanodeInfo[] excludedNodes)
其中src代表需要写入新block的文件;clientName代表写入该block的client、excludedNodes代表该block不能存储的datanode。
首先让我们思考一下addBlock应该如何实现。首先,我们需要修改src文件的INodeFile以便将新增block添加到INodeFile当中。因为INodeFile保存了最后一个Block存放的datanode,因此需要为新增的block分配datanode。下面我们看一下addBlock具体是怎么实现的。
在NameNode中,addBlock是通过FSNamesystem.getAdditionalBlock方法来实现的。在getAdditionalBlock中,首先检查内存中INode和block数量之和是否已经超过系统设置的阈值。然后检查Lease,查看当前client是否拥有文件写锁。除此之外,还需要检查INodeFile倒数第二个block是否已经完成所有备份存储的复制。如果所有这些检查都没有问题就挑选新block所有存储节点(datanode)。存储节点(datanode)挑选完成后,就new一个新的block。新block创建完成后,会在FSNamesystem.blocksMap中保存该block(其实是对应的BlockInfo)并且在INodeFile中添加该block。
注意:通过阅读源码我们可以知道,在addBlock方法中就已经将该block存入到blocksMap当中。这就需要我们考虑当DataNode不断的report该节点存放的block时修改的是不是blocksMap,其之间的逻辑关系是什么样子的。
public LocatedBlock getAdditionalBlock(String src, String clientName,
List<Node> excludedNodes) throws IOException {
long fileLength, blockSize;
int replication;
DatanodeDescriptor clientNode = null;
Block newBlock = null;
NameNode.stateChangeLog
.debug("BLOCK* NameSystem.getAdditionalBlock: file " + src
+ " for " + clientName);
synchronized (this) {
// have we exceeded the configured limit of fs objects.
checkFsObjectLimit();
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
//
// If we fail this, bad things happen!
//
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
fileLength = pendingFile.computeContentSummary().getLength();
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = (int) pendingFile.getReplication();
}
// choose targets for the new block tobe allocated.
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
clientNode, excludedNodes, blockSize);
if (targets.length < this.minReplication) {
throw new IOException("File " + src
+ " could only be replicated to " + targets.length
+ " nodes, instead of " + minReplication);
}
// Allocate a new block and record it in the INode.
synchronized (this) {
if (isInSafeMode()) {
throw new SafeModeException("Cannot add block to " + src,
safeMode);
}
INode[] pathINodes = dir.getExistingPathINodes(src);
int inodesLen = pathINodes.length;
checkLease(src, clientName, pathINodes[inodesLen - 1]);
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) pathINodes[inodesLen - 1];
if (!checkFileProgress(pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet:" + src);
}
// allocate new block record block locations in INode.
newBlock = allocateBlock(src, pathINodes);
pendingFile.setTargets(targets);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
}
// Create next block
LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
if (isAccessTokenEnabled) {
b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
return b;
}
private Block allocateBlock(String src, INode[] inodes) throws IOException {
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
while (isValidBlock(b)) {
b.setBlockId(FSNamesystem.randBlockId.nextLong());
}
b.setGenerationStamp(getGenerationStamp());
b = dir.addBlock(src, inodes, b);
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " + src
+ ". " + b);
return b;
}
Block addBlock(String path, INode[] inodes, Block block) throws IOException {
waitForReady();
synchronized (rootDir) {
INodeFile fileNode = (INodeFile) inodes[inodes.length - 1];
// check quota limits and updated space consumed
updateCount(
inodes,
inodes.length - 1,
0,
fileNode.getPreferredBlockSize()
* fileNode.getReplication(), true);
// associate the new list of blocks with this file
namesystem.blocksMap.addINode(block, fileNode);
BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
fileNode.addBlock(blockInfo);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: " + path
+ " with " + block + " block is added to the in-memory "
+ "file system");
}
return block;
}