Reporter reporter) throws IOException;
}
(3)TextFileInputFormat类仅仅实现了FileInputFormat类的getRecordReader方法,并且重写了isSplitable方法,他并没有实现getSplits方法,由此可知,他的getSplits的实现还是交由父类FileInputFormat来实现的。(这里需要注意TextFileInputFormat并不是InputFormat的子类,TextFileInputFormat它仅仅是继承了InputFormat的getRecordReader的方法而已。)
public class TextInputFormat extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
//子类重新实现了isSplitable方法
protected boolean isSplitable(FileSystem fs, Path file) {
final CompressionCodec codec = compressionCodecs.getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
//该方法实现了将文件中的数据读入到对应的Map方法中。
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) recordDelimiterBytes = delimiter.getBytes();
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}
}
从上面可以看出一个Text格式的文件是通过什么样的类继承层次输入到map方法中。下面主要介绍一下,到底是如何切分的?我们从类的继承层次关系上可以看出,具体的切分方式是通过FileInputFormat类来实现的。因此,要了解文件是如何切分的,只需要查看一下FileInputFormat类中的getSplits方法的实现细节即可。下面我再次把FileInputFormat类中的getSplits方法贴出来:然后分析每一句代码。
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job); //列出当前job中所有的输入文件
// Save the number of input files in the job-conf
job.setLong(NUM_INPUT_FILES, files.length); //设置当前job的输入文件数目
//计算当前job所有输入文件总的大小
long totalSize = 0; // compute total size
//遍历每一个文件
for (FileStatus file: files) { // check we have valid files
if (file.isDir()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
// numSplits是分片数,goalSize是平均每一个分片的大小,minSize是每个分片最小值
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
minSplitSize);
// generate splits 计算分片
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
//获取文件的位置
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
//isSplitable方法根据对应文件名称判断对应文件是否可以切分
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();//获取文件块的大小
// computeSplitSize方法计算真正的分片大小
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;//文件剩余大小
// SPLIT_SLOP=1.1,文件大小/分片的大小> SPLIT_SLOP则进行切分。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// splitHosts用来记录分片元数据信息(包括切片的位置,大小等等)
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) { //如果文件不能切分,相应的会将整个文件作为一个分片。
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(new FileSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
}
//真正计算分片大小的地方。
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}