DataNode: 用于存储HDFS的数据,
public class DataNode extends Configured
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {。。。}
1,实现了InterDatanodeProtocol, ClientDatanodeProtocol来供与客户端以及其他DataNode的通信,接受请求。
2,实现Runnable 接口
首先是来看下run()方法
public void run() {
LOG.info(dnRegistration + "In DataNode.run, data = " + data);
// start dataXceiveServer
dataXceiverServer.start();
while (shouldRun) {
try {
startDistributedUpgradeIfNeeded();
offerService();
} catch (Exception ex) {
LOG.error("Exception: " + StringUtils.stringifyException(ex));
if (shouldRun) {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
shutdown();
}
①, dataXceiverServer.start();开启数据监听守护进程,有数据到来时,开启一个DataXceiver线程
DataXceiverServer.java
public void run() {
while (datanode.shouldRun) {
try {
Socket s = ss.accept();
s.setTcpNoDelay(true);
new Daemon(datanode.threadGroup,
new DataXceiver(s, datanode, this)).start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
+ StringUtils.stringifyException(ie));
} catch (Throwable te) {
LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
+ StringUtils.stringifyException(te));
datanode.shouldRun = false;
}
}
try {
ss.close();
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
+ StringUtils.stringifyException(ie));
}
}
在DataXceiver中主要是从DataXceiverServer读写数据
-------DataXceiver.java----------
/**
* Read/write data from/to the DataXceiveServer.
*/
public void run() {
DataInputStream in=null;
try {
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
short version = in.readShort();
if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
throw new IOException( "Version Mismatch" );
}
boolean local = s.getInetAddress().equals(s.getLocalAddress());
byte op = in.readByte();
// Make sure the xciver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
throw new IOException("xceiverCount " + curXceiverCount
+ " exceeds the limit of concurrent xcievers "
+ dataXceiverServer.maxXceiverCount);
}
long startTime = DataNode.now();
switch ( op ) {
case DataTransferProtocol.OP_READ_BLOCK:
readBlock( in );
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.readsFromLocalClient.inc();
else
datanode.myMetrics.readsFromRemoteClient.inc();
break;
case DataTransferProtocol.OP_WRITE_BLOCK:
writeBlock( in );
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.writesFromLocalClient.inc();
else
datanode.myMetrics.writesFromRemoteClient.inc();
break;
case DataTransferProtocol.OP_READ_METADATA:
readMetadata( in );
datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
replaceBlock(in);
datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_COPY_BLOCK:
// for balancing purpose; send to a proxy source
copyBlock(in);
datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
getBlockChecksum(in);
datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
break;
default:
throw new IOException("Unknown opcode " + op + " in data stream");
}
} catch (Throwable t) {
LOG.error(datanode.dnRegistration + ":DataXceiver",t);
} finally {
LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+ datanode.getXceiverCount());
IOUtils.closeStream(in);
IOUtils.closeSocket(s);
dataXceiverServer.childSockets.remove(s);
}
}