对于一台主机,我们可以在它上面部署多个DataNode进程,这也就是说在一台机器上有多个DataNode节点,而且这些DataNode节点属于同一个HDFS集群,那么这里就有一个问题了,NameNode节点是如何考虑整个集群的负载均衡的?如果NameNode节点以DataNode节点为单位来考虑负载均衡的话,就会出现包含有多个DataNode节点的主机负载过重,所以就不得不以主机为单位来计算集群的负载情况了。在NameNode中用Host2NodesMap类来存储主机与DataNode节点之间的映射。
当一个DataNode节点向NameNode注册成功的时候,NameNode就会把这个DataNode节点存储到它的host2DataNodeMap属性中,也就是Host2NodesMap类的一个实例,这个类主要包含三个属性:
map:存储集群中所有主机上的所有DataNode节点;
r:用来随机选择给定主机上的某一个DataNode节点;
hostmapLock:控制对map的同步操作;
Host2NodesMap类主要负责对集群中的DataNode节点按在它们所在的主机进行分类管理,它可用来添加、删除、查询一个DataNode节点,它也可以按照主机或者DataNode的名字来查询。这些操作对应的方法是:
//判断一个DataNode节点在不在集群中
boolean contains(DatanodeDescriptor node) {
if (node==null) {
return false;
}
String host = node.getHost();
hostmapLock.readLock().lock();
try {
DatanodeDescriptor[] nodes = map.get(host);
if (nodes != null) {
for(DatanodeDescriptor containedNode:nodes) {
if (node==containedNode) {
return true;
}
}
}
} finally {
hostmapLock.readLock().unlock();
}
return false;
}
//添加一个数据节点
boolean add(DatanodeDescriptor node) {
hostmapLock.writeLock().lock();
try {
if (node==null || contains(node)) {
return false;
}
//找到DataNode节点所在的host
String host = node.getHost();
DatanodeDescriptor[] nodes = map.get(host);
DatanodeDescriptor[] newNodes;
if (nodes==null) {
newNodes = new DatanodeDescriptor[1];
newNodes[0]=node;
} else { // rare case: more than one datanode on the host
newNodes = new DatanodeDescriptor[nodes.length+1];
System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
newNodes[nodes.length] = node;
}
map.put(host, newNodes);
return true;
} finally {
hostmapLock.writeLock().unlock();
}
}
//删除一个节点
boolean remove(DatanodeDescriptor node) {
if (node==null) {
return false;
}
String host = node.getHost();
hostmapLock.writeLock().lock();
try {
DatanodeDescriptor[] nodes = map.get(host);
if (nodes==null) {
return false;
}
if (nodes.length==1) {
if (nodes[0]==node) {
map.remove(host);
return true;
} else {
return false;
}
}
//rare case
int i=0;
for(; i<nodes.length; i++) {
if (nodes[i]==node) {
break;
}
}
if (i==nodes.length) {
return false;
} else {
DatanodeDescriptor[] newNodes;
newNodes = new DatanodeDescriptor[nodes.length-1];
System.arraycopy(nodes, 0, newNodes, 0, i);
System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
map.put(host, newNodes);
return true;
}
} finally {
hostmapLock.writeLock().unlock();
}
}
//根据主机名获取这个主机上的一个DataNode节点,如果这个主机上有多个DataNode节点,则随机选一个
DatanodeDescriptor getDatanodeByHost(String host) {
if (host==null) {
return null;
}
hostmapLock.readLock().lock();
try {
DatanodeDescriptor[] nodes = map.get(host);
// no entry
if (nodes== null) {
return null;
}
// one node
if (nodes.length == 1) {
return nodes[0];
}
// more than one node
return nodes[r.nextInt(nodes.length)];
} finally {
hostmapLock.readLock().unlock();
}
}
//根据DataNode节点的名字来找到这个节点
public DatanodeDescriptor getDatanodeByName(String name) {
if (name==null) {
return null;
}
int colon = name.indexOf(":");
String host;
if (colon < 0) {
host = name;
} else {
host = name.substring(0, colon);
}
hostmapLock.readLock().lock();
try {
DatanodeDescriptor[] nodes = map.get(host);
// no entry
if (nodes== null) {
return null;
}
for(DatanodeDescriptor containedNode:nodes) {
if (name.equals(containedNode.getName())) {
return containedNode;
}
}
return null;
} finally {
hostmapLock.readLock().unlock();
}
}