Big Data Analytics

Output Formats

The InputFormat and RecordReader interfaces define how data is read into a MapReduce program. By analogy, the OutputFormat and RecordWriter interfaces dictate how to write the results of a job back to the underlying permanent storage. Several useful OutputFormat implementations are described in Module 4. The default format (TextOutputFormat) will write (key, value) pairs as strings to individual lines of an output file (using the toString() methods of the keys and values). The SequenceFileOutputFormat will keep the data in binary, so it can be later read quickly by the SequenceFileInputFormat. These classes make use of the write() and readFields() methods of the specific Writable classes used by your MapReduce pass.

You can define your own OutputFormat implementation that will write data to an underlying medium in the format that you control. If you want to write to output files on the local system or in HDFS, you should extend the FileOutputFormat abstract class. When you want to use a different output format, you can control this with the JobConf.setOutputFormat() method.

Why might we want to define our own OutputFormat? A custom OutputFormat allows you to exactly control what data is put into a file, and how it is laid out. Suppose another process you use has a custom input file format. Your MapReduce job is supposed to generate inputs compatible with this program. You may develop an OutputFormat implementation which will produce the correct type of file to work with this subsequent process in your tool chain. As an example of how to write an OutputFormat, we will walk through the implementation of a simple XML-based format developed for this tutorial, XmlOutputFormat. Given a set of (key, value) pairs from the Reducer, (e.g., (k1, v1), (k2, v2), etc...) this will generate a file laid out like so:


v1
v2

...


The code to generate these files is presented below:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;

public class XmlOutputFormat extends FileOutputFormat {

protected static class XmlRecordWriter implements RecordWriter {
private static final String utf8 = "UTF-8";

private DataOutputStream out;

public XmlRecordWriter(DataOutputStream out) throws IOException {
this.out = out;
out.writeBytes("\n");
}

/**
* Write the object to the byte stream, handling Text as a special case.
*
* @param o
* the object to print
* @throws IOException
* if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}

private void writeKey(Object o, boolean closing) throws IOException {
out.writeBytes("<");
if (closing) {
out.writeBytes("/");
}
writeObject(o);
out.writeBytes(">");
if (closing) {
out.writeBytes("\n");
}
}

public synchronized void write(K key, V value) throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;

if (nullKey && nullValue) {
return;
}

Object keyObj = key;

if (nullKey) {
keyObj = "value";
}

writeKey(keyObj, false);

if (!nullValue) {
writeObject(value);
}

writeKey(keyObj, true);
}

public synchronized void close(Reporter reporter) throws IOException {
try {
out.writeBytes("
\n");
} finally {
// even if writeBytes() fails, make sure we close the stream
out.close();
}
}
}

public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new XmlRecordWriter(fileOut);
}
}

The FileOutputFormat which XmlOutputFormat subclasses will handle most of the heavy lifting. The only method directly implemented in XmlOutputFormat is getRecordWriter(), which is a factory method for the RecordWriter object which will actually write the file. The inner class XmlRecordWriter is the implementation which generates files in the format shown above. The RecordWriter is initialized with an output stream connected to a file in the output file system. At the same time, the XML prologue is written into the output file. The particular output file system and filename associated with this output stream are determined based on the current job configuration. The XmlRecordWriter's write() method is then called each time a (key, value) pair is provided to the OutputCollector by the Reducer. When the Reducer finishes, the close() method of the XmlRecordWriter will write the XML epilogue and close the underlying stream.