本次主要介绍DN端进行数据传输和接受的类BlockSender和BlockReceiver,其中BlockSender是读取DN本地block数据传送回数据请求端,BlockReceiver是接受存储数据,写入到本地的block中。
首选介绍BlockSender的主要函数的作用:
构造函数
94 this.blockLength = datanode.data.getVisibleLength(block); 99 checksumIn = new DataInputStream( 100 new BufferedInputStream(datanode.data.getMetaDataInputStream(block), 101 BUFFER_SIZE)); 123 bytesPerChecksum = checksum.getBytesPerChecksum(); 129 checksumSize = checksum.getChecksumSize(); 145 offset = (startOffset - (startOffset % bytesPerChecksum)); 146 if (length >= 0) { 147 // Make sure endOffset points to end of a checksumed chunk. 148 long tmpLen = startOffset + length; 149 if (tmpLen % bytesPerChecksum != 0) { 150 tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum); 151 } 152 if (tmpLen < endOffset) { 153 endOffset = tmpLen; 154 } 155 } 156 157 // seek to the right offsets 158 if (offset > 0) { 159 long checksumSkip = (offset / bytesPerChecksum) * checksumSize; 160 // note blockInStream is seeked when created below 161 if (checksumSkip > 0) { 162 // Should we use seek() for checksum file as well? 163 IOUtils.skipFully(checksumIn, checksumSkip); 164 } 165 } 166 seqno = 0; 167 168 blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset在开始介绍构造函数之前,先说一下DN上每个block的存放形式,block以blk_3910275445015356807和blk_3910275445015356807_1001.meta这样的形式存在,第一个文件存储内容,第二个存储元信息,所以才读取block的时候需要读取这两个文件。
开始看一下构造函数的主要作用,94行获取block文件的长度,99行打开block对应的元信息文件,123行获取每个chunk的大小,129行获取chunksum的大小。那么chunk和chunksum是什么呢,这里需要介绍下DN传送和接受数据的特点,每次传输和接受一个package,每个package包含若干个chunk,每个chunk对应的元信息就是chunksum。有了这个背景知识就知道145行的含义了,我们要求读取的数据可以从任意位置开始,但是DN传送时却要从一个整的chunk开始,因为元信息就是按照chunk为单位计算和存储的,这个ooffset就是我们要求读取的位置和实际读取位置的一个偏移,这个需要传回客户端,根据这个值用户才不会多介绍数据。149和150行对应读取的末尾做同样的处理,但是这个地方为什么没有把多读取数据的偏移量返回呢,是因为我们既然已经知道了起始位置有知道了读取的长度,所以我们知道实际的末尾,即使传送的数据有多余信息也不会造成影响。163行是使读取的元信息的文件流移动到要读取的位置,168行是打开block文件,并将文件流偏移到读取的位置。
构造函数初始化完毕以后,再来看传送数据的主函数sendBlock
383 long sendBlock(DataOutputStream out, OutputStream baseStream, 384 BlockTransferThrottler throttler) throws IOException { 396 try { 397 checksum.writeHeader(out); 398 if ( chunkOffsetOK ) { 399 out.writeLong( offset ); 400 } 433 ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); 434 435 while (endOffset > offset) { 436 long len = sendChunks(pktBuf, maxChunksPerPacket, 437 streamForSendChunks); 438 offset += len; 439 totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum* 440 checksumSize); 441 seqno++; 442 } 443 try { 444 out.writeInt(0); // mark the end of block 445 out.flush();397行是首先传送一些元数据信息,399传送读取数据时多余的偏移,433行声明读取数据的缓冲区,436调用另外一个函数真正进行数据的传输,每次传送一个package,444行传送一个0表示传送结束。