Hadoop源码分析 HDFS ClientProtocol

addBlock()负责分配一个新的block以及该block备份存储的datanodeaddBlock函数声明如下:

public LocatedBlock addBlock(String src, String clientName,DatanodeInfo[] excludedNodes) 

其中src代表需要写入新block的文件;clientName代表写入该blockclientexcludedNodes代表该block不能存储的datanode

首先让我们思考一下addBlock应该如何实现。首先,我们需要修改src文件的INodeFile以便将新增block添加到INodeFile当中。因为INodeFile保存了最后一个Block存放的datanode,因此需要为新增的block分配datanode。下面我们看一下addBlock具体是怎么实现的。

NameNode中,addBlock是通过FSNamesystem.getAdditionalBlock方法来实现的。在getAdditionalBlock中,首先检查内存中INodeblock数量之和是否已经超过系统设置的阈值。然后检查Lease,查看当前client是否拥有文件写锁。除此之外,还需要检查INodeFile倒数第二个block是否已经完成所有备份存储的复制。如果所有这些检查都没有问题就挑选新block所有存储节点(datanode)。存储节点(datanode)挑选完成后,就new一个新的block。新block创建完成后,会在FSNamesystem.blocksMap中保存该block(其实是对应的BlockInfo)并且在INodeFile中添加该block

注意:通过阅读源码我们可以知道,在addBlock方法中就已经将该block存入到blocksMap当中。这就需要我们考虑当DataNode不断的report该节点存放的block时修改的是不是blocksMap,其之间的逻辑关系是什么样子的。

public LocatedBlock getAdditionalBlock(String src, String clientName,

List<Node> excludedNodes) throws IOException {

long fileLength, blockSize;

int replication;

DatanodeDescriptor clientNode = null;

Block newBlock = null;

NameNode.stateChangeLog

.debug("BLOCK* NameSystem.getAdditionalBlock: file " + src

" for " + clientName);

synchronized (this) {

// have we exceeded the configured limit of fs objects.

checkFsObjectLimit();

INodeFileUnderConstruction pendingFile = checkLease(src, clientName);

//

// If we fail this, bad things happen!

//

if (!checkFileProgress(pendingFile, false)) {

throw new NotReplicatedYetException("Not replicated yet:" + src);

}

fileLength = pendingFile.computeContentSummary().getLength();

blockSize = pendingFile.getPreferredBlockSize();

clientNode = pendingFile.getClientNode();

replication = (int) pendingFile.getReplication();

}

// choose targets for the new block tobe allocated.

DatanodeDescriptor targets[] = replicator.chooseTarget(replication,

clientNode, excludedNodes, blockSize);

if (targets.length < this.minReplication) {

throw new IOException("File " + src

" could only be replicated to " + targets.length

" nodes, instead of " + minReplication);

}

// Allocate a new block and record it in the INode.

synchronized (this) {

if (isInSafeMode()) {

throw new SafeModeException("Cannot add block to " + src,

safeMode);

}

INode[] pathINodes = dir.getExistingPathINodes(src);

int inodesLen = pathINodes.length;

checkLease(src, clientName, pathINodes[inodesLen - 1]);

INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) pathINodes[inodesLen - 1];

if (!checkFileProgress(pendingFile, false)) {

throw new NotReplicatedYetException("Not replicated yet:" + src);

}

// allocate new block record block locations in INode.

newBlock = allocateBlock(src, pathINodes);

pendingFile.setTargets(targets);

for (DatanodeDescriptor dn : targets) {

dn.incBlocksScheduled();

}

}

// Create next block

LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);

if (isAccessTokenEnabled) {

b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),

EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));

}

return b;

}

private Block allocateBlock(String src, INode[] inodes) throws IOException {

Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);

while (isValidBlock(b)) {

b.setBlockId(FSNamesystem.randBlockId.nextLong());

}

b.setGenerationStamp(getGenerationStamp());

b = dir.addBlock(src, inodes, b);

NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " + src

". " + b);

return b;

}

Block addBlock(String path, INode[] inodes, Block block) throws IOException {

waitForReady();

synchronized (rootDir) {

INodeFile fileNode = (INodeFile) inodes[inodes.length - 1];

// check quota limits and updated space consumed

updateCount(

inodes,

inodes.length - 1,

0,

fileNode.getPreferredBlockSize()

* fileNode.getReplication(), true);

// associate the new list of blocks with this file

namesystem.blocksMap.addINode(block, fileNode);

BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);

fileNode.addBlock(blockInfo);

NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: " + path

" with " + block + " block is added to the in-memory "

"file system");

}

return block;

}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/cc5275540898bfd66e193064a39b19a5.html