@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return this.value;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
throws IOException, InterruptedException {
fileSplit = (FileSplit) inputSplit;
Configuration job = tacontext.getConfiguration();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(job);
fis = fs.open(file);
}
@Override
public boolean nextKeyValue() {
if (key == null) {
key = new Text();
}
if (value == null) {
value = new BytesWritable();
}
if (!processed) {
byte[] content = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
System.out.println(file.getName());
key.set(file.getName());
try {
IOUtils.readFully(fis, content, 0, content.length);
// value.set(content, 0, content.length);
value.set(new BytesWritable(content));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
IOUtils.closeStream(fis);
}
processed = true;
return true;
}
return false;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? fileSplit.getLength() : 0;
}
}
在 Oracle 数据库中实现 MapReduce