众所周知,我们需要Hadoop来分布式存储我们的数据,提高并发和吞吐量,造就了Mapreduce框架的易用性。那对于整个这个过程来说,最开始需要我们认识到的是文件是如何存储在hadoop系统上的。
Hadoop可以分为三个部分,Client端,namenode端和datanode端。他们之间的协作做成了这个庞大的分布式文件系统。文件从客户端这个接口,进入系统,由客户端和namenode通信,使用反射机制,告知Client文件所需要存储的datanode列表,然后就可以进行传输了,当然,我们在这里屏蔽了所有hadoop错误处理的过程,即便这是hadoop的最大的优势之一。
大体的过程知道了,那么下面我们可以深入源码,来看看具体的实现。
首先在Client端,假设你写了一个mapreduce程序就是用来存储一个文件的,代码如下:
String localSrc = "/root/Desktop/examp.c";
String dst = "/fangpei/examp.c";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
URI uri = URI.create(dst);
FileSystem fs = FileSystem.get(uri, conf);
OutputStream out = fs.create(new Path(dst),new Progressable(){
public void progress(){
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096,true);
FileSystem.get(uri, conf);
用来获取文件系统。其中FileSystem类是抽象类,会进入内存的cache寻找已创建的文件系统(FileSystem.cache.get(uri,conf)),如果没有则最终会调用newInstance去创建DistrubutedFileSystem这个类的对象,放入cache中并返回这个文件系统实例。
OutputStream out = fs.create(new Path(dst),new Progressable(){
public void progress(){
System.out.print(".");
}
});
这个create会调用DistributedFileSystem的create。这个create创建了一个DFSOutputStream输出流,在这个输出流的构造函数中会映射调用namenode方法用于在分布式系统上创建一个文件。
namenode.create(src, masked, clientName, overwrite, createParent, replication, blockSize);
然后启动streamer
streamer.start();
DataStreamer这个线程类用于真正的数据传输。里面保持了一个dataqueue,会不停的监听这个队列是否为空。
启动完数据监听守护进程后,对数据IO通道的读写操作如下:
IOUtils.copyBytes(in, out, 4096,true);
其中,copyBytes最终会调用到writechunk函数,该函数如下:
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock += len;
............................
enqueueCurrentPacket();
这里就是将数据包组装并且压进队列,然后DataStreamer就会执行发送队列。