You can define your own OutputFormat implementation that will write data to an underlying medium in the format that you control. If you want to write to output files on the local system or in HDFS, you should extend the FileOutputFormat abstract class. When you want to use a different output format, you can control this with the JobConf.setOutputFormat() method.
Why might we want to define our own OutputFormat? A custom OutputFormat allows you to exactly control what data is put into a file, and how it is laid out. Suppose another process you use has a custom input file format. Your MapReduce job is supposed to generate inputs compatible with this program. You may develop an OutputFormat implementation which will produce the correct type of file to work with this subsequent process in your tool chain. As an example of how to write an OutputFormat, we will walk through the implementation of a simple XML-based format developed for this tutorial, XmlOutputFormat. Given a set of (key, value) pairs from the Reducer, (e.g., (k1, v1), (k2, v2), etc...) this will generate a file laid out like so:
...
The code to generate these files is presented below:
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
public class XmlOutputFormat
protected static class XmlRecordWriter
private static final String utf8 = "UTF-8";
private DataOutputStream out;
public XmlRecordWriter(DataOutputStream out) throws IOException {
this.out = out;
out.writeBytes("
}
/**
* Write the object to the byte stream, handling Text as a special case.
*
* @param o
* the object to print
* @throws IOException
* if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}
private void writeKey(Object o, boolean closing) throws IOException {
out.writeBytes("<");
if (closing) {
out.writeBytes("/");
}
writeObject(o);
out.writeBytes(">");
if (closing) {
out.writeBytes("\n");
}
}
public synchronized void write(K key, V value) throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
Object keyObj = key;
if (nullKey) {
keyObj = "value";
}
writeKey(keyObj, false);
if (!nullValue) {
writeObject(value);
}
writeKey(keyObj, true);
}
public synchronized void close(Reporter reporter) throws IOException {
try {
out.writeBytes("
} finally {
// even if writeBytes() fails, make sure we close the stream
out.close();
}
}
}
public RecordWriter
String name, Progressable progress) throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new XmlRecordWriter
}
}
The FileOutputFormat which XmlOutputFormat subclasses will handle most of the heavy lifting. The only method directly implemented in XmlOutputFormat is getRecordWriter(), which is a factory method for the RecordWriter object which will actually write the file. The inner class XmlRecordWriter is the implementation which generates files in the format shown above. The RecordWriter is initialized with an output stream connected to a file in the output file system. At the same time, the XML prologue is written into the output file. The particular output file system and filename associated with this output stream are determined based on the current job configuration. The XmlRecordWriter's write() method is then called each time a (key, value) pair is provided to the OutputCollector by the Reducer. When the Reducer finishes, the close() method of the XmlRecordWriter will write the XML epilogue and close the underlying stream.