Big Data Analytics

Using Amazon Web Services

Hadoop's power comes from its ability to perform work on a large number of machines simultaneously. What if you want to experiment with Hadoop, but do not have many machines? While operations on a two or four-node cluster are functionally equivalent to those on a 40 or 100-node cluster, processing larger volumes of data will require a larger number of nodes.

Amazon provides machines for rent on demand through their Elastic Compute Cloud (a.k.a. EC2) service. EC2 is part of a broader set of services collectively called the Amazon Web Services, or AWS. EC2 allows you request a set of nodes ("instances" in their parlance) for as long as you need them. You pay by the instance*hour, plus costs for bandwidth. You can use EC2 instances to run a Hadoop cluster. Hadoop comes with a set of scripts which will provision EC2 instances.

The first step in this process is visit the EC2 web site (link above) and click "Sign Up For This Web Service". You will need to create an account and provide billing information. Then follow the instructions in the Getting started guide to set up your account and configure your system to run the AWS tools.

Once you have done so, follow the instructions in the Hadoop wiki specific to running Hadoop on Amazon EC2. While more details are available in the above document, the shortest steps to provisioning a cluster are:

* Edit src/contrib/ec2/bin/hadoop-ec2-env.sh to contain your Amazon account information and parameters about the desired cluster size.
* Execute src/contrib/ec2/bin/hadoop-ec2 launch-cluster.

After the cluster has been started, you can log in to the head node over ssh with the bin/hadoop-ec2 login script, and perform your MapReduce computation. When you are done, log out and type bin/hadoop-ec2 terminate-cluster to release the EC2 instances. The contents of the virtual hard drives on the instances will disappear, so be sure to copy off any important data with scp or another tool first!

A very thorough introduction to configuring Hadoop on EC2 and running a test job is provided in this article in the Amazon Web Services Developer Connection site.

Distributing Debug Scripts

Hadoop will generate a large number of log files for a job, distributed across all the nodes that participated in the job's execution. Often times only a subset of these logs will be of interest when debugging failing tasks. MapReduce can help with this by running a user-provided script when either a map or reduce task fails. These scripts are provided the names of files containing the stdout and stderr from the task, as well as the task's Hadoop log and job.xml file (i.e., its complete JobConf in serialized form).

These scripts will be run on whichever node encounters failing tasks. You can use these scripts to perform automation to allow you to more easily inspect only the failing tasks: e.g., email the stdout/stderr to an administrator email address; upload the failed task's log files to a common NFS-mounted "debug dump" directory, preserve local state modifications made by map tasks, etc.

Separate scripts can be provided for map and reduce task failure. They each receive as arguments, in order, the names of files containing the task's stdout, stderr, syslog, and jobconf. Because they are run on all the task nodes, and not on the client machine where the job was submitted, these scripts must be sent to the nodes through the distributed cache listed above.

The following method will enable failed task scripts on a MapReduce job being prepared. It assumes that you have given it the names of two scripts (e.g., bash scripts) which do your debug actions with the log filenames provided (e.g., copy them to a shared NFS mount). In this script these are /home/aaron/src/map-fail and reduce-fail.

private static final String FAILED_MAP_SCRIPT_NAME = "map-fail";
private static final String FAILED_REDUCE_SCRIPT_NAME = "reduce-fail";

private static final String HDFS_SCRIPT_DIR = "/debug";

private static final String HDFS_FAILED_MAP_SCRIPT =
HDFS_SCRIPT_DIR + "/" + FAILED_MAP_SCRIPT_NAME;
private static final String HDFS_FAILED_REDUCE_SCRIPT =
HDFS_SCRIPT_DIR + "/" + FAILED_REDUCE_SCRIPT_NAME;
private static final String LOCAL_FAILED_MAP_SCRIPT =
"/home/aaron/src/" + FAILED_MAP_SCRIPT_NAME;
private static final String LOCAL_FAILED_REDUCE_SCRIPT =
"/home/aaron/src/" + FAILED_REDUCE_SCRIPT_NAME;

public static void setupFailedTaskScript(JobConf conf) throws IOException {

// create a directory on HDFS where we'll upload the fail scripts
FileSystem fs = FileSystem.get(conf);
Path debugDir = new Path(HDFS_SCRIPT_DIR);

// who knows what's already in this directory; let's just clear it.
if (fs.exists(debugDir)) {
fs.delete(debugDir, true);
}

// ...and then make sure it exists again
fs.mkdirs(debugDir);

// upload the local scripts into HDFS
fs.copyFromLocalFile(new Path(LOCAL_FAILED_MAP_SCRIPT),
new Path(HDFS_FAILED_MAP_SCRIPT));
fs.copyFromLocalFile(new Path(LOCAL_FAILED_REDUCE_SCRIPT),
new Path(HDFS_FAILED_REDUCE_SCRIPT));

conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
DistributedCache.createSymlink(conf);

URI fsUri = fs.getUri();

String mapUriStr = fsUri.toString() + HDFS_FAILED_MAP_SCRIPT
+ "#" + FAILED_MAP_SCRIPT_NAME;
URI mapUri = null;
try {
mapUri = new URI(mapUriStr);
} catch (URISyntaxException use) {
throw new IOException(use);
}

DistributedCache.addCacheFile(mapUri, conf);

String reduceUriStr = fsUri.toString() + HDFS_FAILED_REDUCE_SCRIPT
+ "#" + FAILED_REDUCE_SCRIPT_NAME;
URI reduceUri = null;
try {
reduceUri = new URI(reduceUriStr);
} catch (URISyntaxException use) {
throw new IOException(use);
}

DistributedCache.addCacheFile(reduceUri, conf);
}

How does this all work? The scripts are, presumably, initially hosted on the client machine that is submitting the job. The client is responsible for injecting them into HDFS and enabling them in the distributed cache. It first creates the HDFS_SCRIPT_DIR and then uploads the local script files into this directory.

It must then set the commands for the TaskTracker to execute to run the scripts. This is accomplished by the lines:

conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
DistributedCache.createSymlink(conf);

The distributed cache copies the files to the mapred.local.dir on each task node. The TaskTracker will then execute the scripts if necessary. But the TaskTracker does not run with its working directory set to mapred.local.dir. Fortunately, the distributed cache can be told to create symlinks in the working directory for files in the distributed cache. The third line of the snippit above enables this functionality. Now ./FAILED_MAP_SCRIPT_NAME will point to the copy of FAILED_MAP_SCRIPT_NAME in the local cache directory, and the script can be run.

But before that can happen, we must add the files themselves to the distributed cache. (As of now they are only in HDFS.) Ordinarily, we could just call DistributedCache.addCacheFile(new Path("hdfs_path_to_some_file").toUri()) on a filename and that would be sufficient. But since we need to create symlinks, we must provide the distributed cache with information as to how the symlink should be created--what filename it should take in the working directory. This is provided as the URI "anchor" part following the "#" in the URI. A subtlety of Hadoop's Path class is that if you put a '#' in the path string, it will URL-encode it and treat it as part of the filename. Therefore, we use some extra code to construct our URIs manually to ensure that the '#' remains unescaped.

Distributing Auxiliary Job Data

The bulk of the data that you process in a MapReduce job will probably be stored in large files spread across the HDFS. You can reliably store petabytes of information in HDFS and individual jobs can process several terabytes at a time. The HDFS access model, however, assumes that the data from a file should be read into a single mapper. The individual files stored in HDFS are very large and can possibly be broken into different chunks for processing in parallel.

Sometimes it is necessary for every Mapper to read a single file; for example, a distributed spell-check application would require every Mapper to read in a copy of the dictionary before processing documents. The dictionary will be small (only a few megabytes), but needs to be widely available so that all nodes can reach it.

Hadoop provides a mechanism specifically for this purpose, called the distributed cache. The distributed cache can contain small data files needed for initialization or libraries of code that may need to be accessed on all nodes in the cluster.

To use the distributed cache to disseminate files, create an instance of the DistributedCache class when setting up your job. Use the DistributedCache.addCacheFile() method to add names of files which should be sent to all nodes on the system. The file names are specified as URI objects; unless qualified otherwise, they assume that the file is present on the HDFS in the path indicated. You can copy local files to HDFS with the FileSystem.copyFromLocalFile() method.

When you want to retrieve files from the distributed cache (e.g., when the mapper is in its configure() step and wants to load config data like the dictionary mentioned above), use the DistributedCache.getLocalCacheFiles() method to retrieve the list of paths local to the current node for the cached files. These are copies of all cached files, placed in the local file system of each worker machine. (They will be in a subdirectory of mapred.local.dir.) Each of the paths returned by getLocalCacheFiles() can be accessed via regular Java file I/O mechanisms, such as java.io.FileInputStream.

As a cautionary note: If you use the local JobRunner in Hadoop (i.e., what happens if you call JobClient.runJob() in a program with no or an empty hadoop-conf.xml accessible), then no local data directory is created; the getLocalCacheFiles() call will return an empty set of results. Unit test code should take this into account.

Suppose that we were writing an inverted index builder. We do not want to include very common words such "the," "a," "and," etc. These so-called stop words might all be listed in a file. All the mappers should read the stop word list when they are initialized, and then filter the index they generate against this list. We can disseminate a list of stop words to all the Mappers with the following code. The first listing will put the stop-words file into the distributed cache:

public static final String LOCAL_STOPWORD_LIST =
"/home/aaron/stop_words.txt";

public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";

void cacheStopWordList(JobConf conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path hdfsPath = new Path(HDFS_STOPWORD_LIST);

// upload the file to hdfs. Overwrite any existing copy.
fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
hdfsPath);

DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
}


This code copies the local stop_words.txt file into HDFS, and then tells the distributed cache to send the HDFS copy to all nodes in the system. The next listing actually uses the file in the mapper:

class IndexMapperExample implements Mapper {
void configure(JobConf conf) {
try {
String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).getName();
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
if (null != cacheFiles && cacheFiles.length > 0) {
for (Path cachePath : cacheFiles) {
if (cachePath.getName().equals(stopwordCacheName)) {
loadStopWords(cachePath);
break;
}
}
}
} catch (IOException ioe) {
System.err.println("IOException reading from distributed cache");
System.err.println(ioe.toString());
}
}

void loadStopWords(Path cachePath) throws IOException {
// note use of regular java.io methods here - this is a local file now
BufferedReader wordReader = new BufferedReader(
new FileReader(cachePath.toString()));
try {
String line;
this.stopWords = new HashSet();
while ((line = wordReader.readLine()) != null) {
this.stopWords.add(line);
}
} finally {
wordReader.close();
}
}

/* actual map() method, etc go here */
}

The code above belongs in the Mapper instance associated with the index generation process. We retrieve the list of files cached in the distributed cache. We then compare the basename of each file (using Path.getName()) with the one we expect for our stop word list. Once we find this file, we read the words, one per line, into a Set instance that we will consult during the mapping process.

The distributed cache has additional uses too. For instance, you can use the DistributedCache.addArchiveToClassPath() method to send a .jar file to all the nodes. It will be inserted into the classpath as well, so that classes in the archive can be accessed by all the nodes.

Reporting Custom Metrics

The Hadoop system records a set of metric counters for each job that it runs. For example, the number of input records mapped, the number of bytes it reads from or writes to HDFS, etc. To profile your applications, you may wish to record other values as well. For example, if the records sent into your mappers fall into two categories (call them "A" and "B"), you may wish to count the total number of A-records seen vs. the total number of B-records.

The Reporter object passed in to your Mapper and Reducer classes can be used to update counters. The same set of counter variables can be contributed to by all Mapper and Reducer instances across your cluster. The values are aggregated by the master node of the cluster, so they are "thread-safe" in this manner.

Counters are incremented through the Reporter.incrCounter() method. The names of the counters are defined as Java enum's. The following example demonstrates how to count the number of "A" vs. "B" records seen by the mapper:

public class MyMapper extends MapReduceBase implements
Mapper {

static enum RecordCounters { TYPE_A, TYPE_B, TYPE_UNKNOWN };

// actual definitions elided
public boolean isTypeARecord(Text input) { ... }
public boolean isTypeBRecord(Text input) { ... }

public void map(Text key, Text val, OutputCollector output,
Reporter reporter) throws IOException {

if (isTypeARecord(key)) {
reporter.incrCounter(RecordCounters.TYPE_A, 1);
} else if (isTypeBRecord(key)) {
reporter.incrCounter(RecordCounters.TYPE_B, 1);
} else {
reporter.incrCounter(RecordCounters.TYPE_UNKNOWN, 1);
}

// actually process the record here, call
// output.collect( .. ), etc.
}
}

If you launch your job with JobClient.runJob(), the diagnostic information printed to stdout when the job completes will contain the values of all the counters. Both runJob() and submitJob() will return a RunningJob object that refers to the job in question. The RunningJob.getCounters() method will return a Counters object that contains the values of all the counters so that you can query them programmatically. The Counters.getCounter(Enum key) method returns the value of a particular counter.

Partitioning Data

"Partitioning" is the process of determining which reducer instance will receive which intermediate keys and values. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same. If the key "cat" is generated in two separate (key, value) pairs, they must both be reduced together. It is also important for performance reasons that the mappers be able to partition data independently -- they should never need to exchange information with one another to determine the partition for a particular key.

Hadoop uses an interface called Partitioner to determine which partition a (key, value) pair will go to. A single partition refers to all (key, value) pairs which will be sent to a single reduce task. Hadoop MapReduce determines when the job starts how many partitions it will divide the data into. If twenty reduce tasks are to be run (controlled by the JobConf.setNumReduceTasks()) method), then twenty partitions must be filled.

The Partitioner defines one method which must be filled:

public interface Partitioner extends JobConfigurable {
int getPartition(K key, V value, int numPartitions);
}

The getPartition() method receives a key and a value and the number of partitions to split the data across; a number in the range [0, numPartitions) must be returned by this method, indicating which partition to send the key and value to. For any two keys k1 and k2, k1.equals(k2) implies getPartition(k1, *, n) == getPartition(k2, *, n).

The default Partitioner implementation is called HashPartitioner. It uses the hashCode() method of the key objects modulo the number of partitions total to determine which partition to send a given (key, value) pair to.

For most randomly-distributed data, this should result in all partitions being of roughly equal size. If the data in your data set is skewed in some way, however, this might not produce good results. For example, if you know that the key 0 will appear much more frequently than any other key, then you may want to send all the 0-keyed data to one partition, and distribute the other keys over all other partitions by their hashCode(). Also, if the hashCode() method for your data type does not provide uniformly-distributed values over its range, then data may not be sent to reducers evenly. Poor partitioning of data means that some reducers will have more data input than others, which usually means they'll have more work to do than the other reducers. Thus the entire job will wait for one reducer to finish its extra-large share of the load, when it might have been possible to spread that across many different reducers.

If you are dissatisfied with the performance of HashPartitioner, you are free to write your own Partitioner implementation. It can be general-purpose, or tailored to the specific data types or values that you expect to use in your application. After implementing the Partitioner interface, use the JobConf.setPartitionerClass() method to tell Hadoop to use it for your job.

Output Formats

The InputFormat and RecordReader interfaces define how data is read into a MapReduce program. By analogy, the OutputFormat and RecordWriter interfaces dictate how to write the results of a job back to the underlying permanent storage. Several useful OutputFormat implementations are described in Module 4. The default format (TextOutputFormat) will write (key, value) pairs as strings to individual lines of an output file (using the toString() methods of the keys and values). The SequenceFileOutputFormat will keep the data in binary, so it can be later read quickly by the SequenceFileInputFormat. These classes make use of the write() and readFields() methods of the specific Writable classes used by your MapReduce pass.

You can define your own OutputFormat implementation that will write data to an underlying medium in the format that you control. If you want to write to output files on the local system or in HDFS, you should extend the FileOutputFormat abstract class. When you want to use a different output format, you can control this with the JobConf.setOutputFormat() method.

Why might we want to define our own OutputFormat? A custom OutputFormat allows you to exactly control what data is put into a file, and how it is laid out. Suppose another process you use has a custom input file format. Your MapReduce job is supposed to generate inputs compatible with this program. You may develop an OutputFormat implementation which will produce the correct type of file to work with this subsequent process in your tool chain. As an example of how to write an OutputFormat, we will walk through the implementation of a simple XML-based format developed for this tutorial, XmlOutputFormat. Given a set of (key, value) pairs from the Reducer, (e.g., (k1, v1), (k2, v2), etc...) this will generate a file laid out like so:


v1
v2

...


The code to generate these files is presented below:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;

public class XmlOutputFormat extends FileOutputFormat {

protected static class XmlRecordWriter implements RecordWriter {
private static final String utf8 = "UTF-8";

private DataOutputStream out;

public XmlRecordWriter(DataOutputStream out) throws IOException {
this.out = out;
out.writeBytes("\n");
}

/**
* Write the object to the byte stream, handling Text as a special case.
*
* @param o
* the object to print
* @throws IOException
* if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}

private void writeKey(Object o, boolean closing) throws IOException {
out.writeBytes("<");
if (closing) {
out.writeBytes("/");
}
writeObject(o);
out.writeBytes(">");
if (closing) {
out.writeBytes("\n");
}
}

public synchronized void write(K key, V value) throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;

if (nullKey && nullValue) {
return;
}

Object keyObj = key;

if (nullKey) {
keyObj = "value";
}

writeKey(keyObj, false);

if (!nullValue) {
writeObject(value);
}

writeKey(keyObj, true);
}

public synchronized void close(Reporter reporter) throws IOException {
try {
out.writeBytes("
\n");
} finally {
// even if writeBytes() fails, make sure we close the stream
out.close();
}
}
}

public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new XmlRecordWriter(fileOut);
}
}

The FileOutputFormat which XmlOutputFormat subclasses will handle most of the heavy lifting. The only method directly implemented in XmlOutputFormat is getRecordWriter(), which is a factory method for the RecordWriter object which will actually write the file. The inner class XmlRecordWriter is the implementation which generates files in the format shown above. The RecordWriter is initialized with an output stream connected to a file in the output file system. At the same time, the XML prologue is written into the output file. The particular output file system and filename associated with this output stream are determined based on the current job configuration. The XmlRecordWriter's write() method is then called each time a (key, value) pair is provided to the OutputCollector by the Reducer. When the Reducer finishes, the close() method of the XmlRecordWriter will write the XML epilogue and close the underlying stream.

Input Formats

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats. These types are described in more detail in Module 4.

More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.

Another important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called "splits" and are encapsulated in instances of the InputSplit interface. Most files, for example, are split up on the boundaries of the underlying blocks in HDFS, and are represented by instances of the FileInputSplit class. Other files may be unsplittable, depending on application-specific data. Dividing up other data sources (e.g., tables from a database) into splits would be performed in a different, application-specific fashion. When dividing the data into input splits, it is important that this process be quick and cheap. The data itself should not need to be accessed to perform this process (as it is all done by a single machine at the start of the MapReduce job).

The TextInputFormat divides files into splits strictly by byte offsets. It then reads individual lines of the files from the split in as record inputs to the Mapper. The RecordReader associated with TextInputFormat must be robust enough to handle the fact that the splits do not necessarily correspond neatly to line-ending boundaries. In fact, the RecordReader will read past the theoretical end of a split to the end of a line in one record. The reader associated with the next split in the file will scan for the first full line in the split to begin processing that fragment. All RecordReader implementations must use some similar logic to ensure that they do not miss records that span InputSplit boundaries.
Custom File Formats

In this section we will describe how to develop a custom InputFormat that reads files of a particular format.

Rather than implement InputFormat directly, it is usually best to subclass the FileInputFormat. This abstract class provides much of the basic handling necessary to manipulate files. If we want to parse the file in a particular way, then we must override the getRecordReader() method, which returns an instance of RecordReader: an object that can read from the input source. To motivate this discussion with concrete code, we will develop an InputFormat and RecordReader implementation which can read lists of objects and positions from files. We assume that we are reading text files where each line contains the name of an object and then its coordinates as a set of three comma-separated floating-point values. For instance, some sample data may look like the following:

ball, 3.5, 12.7, 9.0
car, 15, 23.76, 42.23
device, 0.0, 12.4, -67.1

We must read individual lines of the file, separate the key (Text) from the three floats, and then read those into a Point3D object as we developed earlier.

The ObjectPositionInputFormat class itself is very straightforward. Since it will be reading from files, all we need to do is define a factory method for RecordReader implementations:

public class ObjectPositionInputFormat extends
FileInputFormat {

public RecordReader getRecordReader(
InputSplit input, JobConf job, Reporter reporter)
throws IOException {

reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
}

Listing 5.3: InputFormat for object-position files

Note that we define the types of the keys and values emitted by the InputFormat in its definition; these must match the types read in as input by the Mapper in its class definition.

The RecordReader implementation is where the actual file information is read and parsed. We will implement this by making use of the LineRecordReader class; this is the RecordReader implementation used by TextInputFormat to read individual lines from files and return them unparsed. We will wrap the LineRecordReader with our own implementation which converts the values to the expected types. By using LineRecordReader, we do not need to worry about what happens if a record spans an InputSplit boundary, since this underlying record reader already has logic to take care of this fact.

class ObjPosRecordReader implements RecordReader {

private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;

public ObjPosRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);

lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}

public boolean next(Text key, Point3D value) throws IOException {
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}

// parse the lineValue which is in the format:
// objName, x, y, z
String [] pieces = lineValue.toString().split(",");
if (pieces.length != 4) {
throw new IOException("Invalid record received");
}

// try to parse floating point components of value
float fx, fy, fz;
try {
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
} catch (NumberFormatException nfe) {
throw new IOException("Error parsing floating point value in record");
}

// now that we know we'll succeed, overwrite the output objects

key.set(pieces[0].trim()); // objName is the output key.

value.x = fx;
value.y = fy;
value.z = fz;

return true;
}

public Text createKey() {
return new Text("");
}

public Point3D createValue() {
return new Point3D();
}

public long getPos() throws IOException {
return lineReader.getPos();
}

public void close() throws IOException {
lineReader.close();
}

public float getProgress() throws IOException {
return lineReader.getProgress();
}
}

Listing 5.4: RecordReader for object-position files

You can control the InputFormat used by your MapReduce job with the JobConf.setInputFormat() method.

Exercise: Write an InputFormat and RecordReader that read strings of text separated by '$' characters instead of newlines.
Alternate Data Sources

An InputFormat describes both how to present the data to the Mapper and where the data originates from. Most implementations descend from FileInputFormat, which reads from files on the local machine or HDFS. If your data does not come from a source like this, you can write an InputFormat implementation that reads from an alternate source. For example, HBase (a distributed database system) provides a TableInputFormat that reads records from a database table. You could imagine a system where data is streamed to each machine over the network on a particular port; the InputFormat reads data from the port and parses it into individual records for mapping.

Custom Data Types

Hadoop MapReduce uses typed data at all times when it interacts with user-provided Mappers and Reducers: data read from files into Mappers, emitted by mappers to reducers, and emitted by reducers into output files is all stored in Java objects.
Writable Types

Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. Hadoop provides several stock classes which implement Writable: Text (which stores String data), IntWritable, LongWritable, FloatWritable, BooleanWritable, and several others. The entire list is in the org.apache.hadoop.io package of the Hadoop source (see the API reference).

In addition to these types, you are free to define your own classes which implement Writable. You can organize a structure of virtually any layout to fit your data and be transmitted by Hadoop. As a motivating example, consider a mapper which emits key-value pairs where the key is the name of an object, and the value is its coordinates in some 3-dimensional space. The key is some string-based data, and the value is a structure of the form:

struct point3d {
float x;
float y;
float z;
}

The key can be represented as a Text object, but what about the value? How do we build a Point3D class which Hadoop can transmit? The answer is to implement the Writable interface, which requires two methods:

public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
}

The first of these methods initializes all of the fields of the object based on data contained in the binary stream in. The latter writes all the information needed to reconstruct the object to the binary stream out. The DataInput and DataOutput classes (part of java.io) contain methods to serialize most basic types of data; the important contract between your readFields() and write() methods is that they read and write the data from and to the binary stream in the same order. The following code implements a Point3D class usable by Hadoop:

public class Point3D implements Writable {
public float x;
public float y;
public float z;

public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}

public Point3D() {
this(0.0f, 0.0f, 0.0f);
}

public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}

public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}

public String toString() {
return Float.toString(x) + ", "
+ Float.toString(y) + ", "
+ Float.toString(z);
}
}

Listing 5.1: A Point class which implements Writable
Custom Key Types

As written, the Point3D type will work as a value type like we require for the mapper problem described above. But what if we want to emit Point3D objects as keys too? In Hadoop MapReduce, if (key, value) pairs sent to a single reduce task include multiple keys, the reducer will process the keys in sorted order. So key types must implement a stricter interface, WritableComparable. In addition to being Writable so they can be transmitted over the network, they also obey Java's Comparable interface. The following code listing extends Point3D to meet this interface:

public class Point3D implements WritableComparable {
public float x;
public float y;
public float z;

public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}

public Point3D() {
this(0.0f, 0.0f, 0.0f);
}

public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}

public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}

public String toString() {
return Float.toString(x) + ", "
+ Float.toString(y) + ", "
+ Float.toString(z);
}

/** return the Euclidean distance from (0, 0, 0) */
public float distanceFromOrigin() {
return (float)Math.sqrt(x*x + y*y + z*z);
}

public int compareTo(Point3D other) {
float myDistance = distanceFromOrigin();
float otherDistance = other.distanceFromOrigin();

return Float.compare(myDistance, otherDistance);
}

public boolean equals(Object o) {
if (!(other instanceof Point3D)) {
return false;
}

Point3D other = (Point3D)o;
return this.x == other.x && this.y == other.y
&& this.z == other.z;
}

public int hashCode() {
return Float.floatToIntBits(x)
^ Float.floatToIntBits(y)
^ Float.floatToIntBits(z);
}
}

Listing 5.2: A WritableComparable version of Point3D

It is important for key types to implement hashCode() as well; the section on Partitioners later in this module explains why. The methods hashCode() and equals() have been provided in this version of the class as well.
Using Custom Types

Now that you have created a custom data type, Hadoop must be told to use it. You can control the output key or value data type for a job by using the setOutputKeyClass() and setOutputValueClass() methods of the JobConf object that defines your job. By default, this will set the types expected as output from both the map and reduce phases. If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf's setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer. The types delivered as input to the Mapper are governed by the InputFormat used; see the next section of this module for more details.
Faster Comparison Operations

The default sorting process for keys will read instances of the key type in from a stream, parsing the byte stream with the readFields() method of the key class, and then call the compareTo() method of the key class on the two objects. For faster performance, it may be possible to decide on an ordering between two keys just by looking at the byte streams and without parsing all of the data contained therein. For example, consider comparing strings of text. If characters are read in sequentially, then a decision can be made on their ordering as soon as a character position is found where the two strings differ. Even if all of the bytes for the object must be read in, the object itself does not necessarily need to be instantiated around those bytes. To support this higher-speed sorting mechanism, you can extend the WritableComparator class with a comparator specific to your own data type. In particular, the method which should be overridden is

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)

The default implementation is in the class org.apache.hadoop.io.WritableComparator. The relevant method has been reproduced here:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);

buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);

} catch (IOException e) {
throw new RuntimeException(e);
}

return compare(key1, key2); // compare them
}

Its operation is exactly as described above; it performs the straightforward comparison of the two objects after they have been individually deserialized from their separate byte streams (the b variables), which each have their own start offset (s) and length (l) attributes. Both objects must be fully constructed and deserialized before comparison can occur. The Text class, on the other hand, allows incremental comparison via its own implementation of this method. The code from org.apache.hadoop.io.Text is shown here:

/** A WritableComparator optimized for Text keys. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}

public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}
}

The Text object is serialized by first writing its length field to the byte stream, followed by the UTF-encoded string. The method decodeVIntSize determines the length of the integer describing the length of the byte stream. The comparator then skips these bytes, directly comparing the UTF-encoded bytes of the actual string-portion of the stream in the compareBytes() method. As soon as it finds a character in which the two streams differ, it returns a result without examining the rest of the strings.

Note that you do not need to manually specify this comparator's use in your Hadoop programs. Hadoop automatically uses this special comparator implementation for Text data due to the following code being added to Text's static initialization:

static {
// register this comparator
WritableComparator.define(Text.class, new Comparator());
}

Final Writable Notes

Defining custom writable types allows you to intelligently use Hadoop to manipulate higher-level data structures, without needing to use toString() to convert all your data types to text for sending over the network. If you will be using a type in a lot of MapReduce jobs, or you must process a very large volume of them (as is usually the case in Hadoop), defining your own data type classes will provide a significant performance benefit.

Exercise: Assume that we have a mapper which emits line segments as keys and values. A line segment is defined by its endpoints. For our purposes, line segments can be ordered by their lengths. Implement a LineSegment class which implements WritableComparable. Hint: make use of Point3D objects.

Solution to Inverted Index Code

The following source code implements a solution to the inverted indexer problem posed at the checkpoint. The source code is structurally very similar to the source for Word Count; only a few lines really need to be modified.

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class LineIndexer {

public static class LineIndexMapper extends MapReduceBase
implements Mapper {

private final static Text word = new Text();
private final static Text location = new Text();

public void map(LongWritable key, Text val,
OutputCollector output, Reporter reporter)
throws IOException {

FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
location.set(fileName);

String line = val.toString();
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, location);
}
}
}



public static class LineIndexReducer extends MapReduceBase
implements Reducer {

public void reduce(Text key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException {

boolean first = true;
StringBuilder toReturn = new StringBuilder();
while (values.hasNext()){
if (!first)
toReturn.append(", ");
first=false;
toReturn.append(values.next().toString());
}

output.collect(key, new Text(toReturn.toString()));
}
}


/**
* The actual main() method for our program; this is the
* "driver" for the MapReduce job.
*/
public static void main(String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(LineIndexer.class);

conf.setJobName("LineIndexer");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(conf, new Path("input"));
FileOutputFormat.setOutputPath(conf, new Path("output"));

conf.setMapperClass(LineIndexMapper.class);
conf.setReducerClass(LineIndexReducer.class);

client.setConf(conf);

try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Additional Language Support

Hadoop itself is written in Java; it thus accepts Java code natively for Mappers and Reducers. Hadoop also comes with two adapter layers which allow code written in other languages to be used in MapReduce programs.
Pipes

Pipes is a library which allows C++ source code to be used for Mapper and Reducer code. Applications which require high numerical performance may see better throughput if written in C++ and used through Pipes. This library is supported on 32-bit Linux installations.

The include files and static libraries are present in the c++/Linux-i386-32/ directory under your Hadoop installation. Your application should include include/hadoop/Pipes.hh and TemplateFactory.hh and link against lib/libhadooppies.a; with gcc, include the arguments -L${HADOOP_HOME}/c++/Linux-i386-32/lib -lhadooppipes to do the latter.

Both key and value inputs to pipes programs are provided as STL strings (std::string). A program must still define an instance of Mapper and Reducer; these names have not changed. (They, like all other classes defined in Pipes, are in the HadoopPipes namespace.) Unlike the classes of the same names in Hadoop itself, the map() and reduce() functions take in a single argument which is a reference to an object of type MapContext and ReduceContext respectively. The most important methods contained in each of these context objects are:

const std::string& getInputKey();
const std::string& getInputValue();
void emit(const std::string& key, const std::string& value);

The ReduceContext class also contains an additional method to advance the value iterator:

bool nextValue();

Defining a Pipes Program: A program to use with Pipes is defined by writing classes extending Mapper and Reducer. (And optionally, Partitioner; see Module 5.) Hadoop must then be informed which classes to use to run the job.

An instance of your C++ program will be started by the Pipes framework in main() on each machine. This should do any (hopefully brief) configuration required for your task. It should then define a Factory to create Mapper and Reducer instances as necessary, and then run the job by calling the runTask() method. The simplest way to define a factory is with the following code:

#include"TemplateFactory.hh"
using namespace HadoopPipes;

void main() {
// classes are indicated to the factory via templates
// TODO: Substitute your own class names in below.
TemplateFactory2 factory();

// do any configuration you need to do here

// start the task
bool result = runTask(factory);
}


Running a Pipes Program: After a Pipes program has been written and compiled, it can be launched as a job with the following command: (Do this in your Hadoop home directory)

$ bin/hadoop pipes -input inputPath -output outputPath -program path/to/pipes/program/executable


This will deploy your Pipes program on all nodes and run the MapReduce job through it. By running bin/hadoop pipes with no options, you can see additional usage information which describes how to set additional configuration values as necessary.

The Pipes API contains additional functionality to allow you to read settings from the JobConf, override the Partitioner class, and use RecordReaders in a more direct fashion for higher performance. See the header files in c++/Linux-i386-32/include/hadoop for more information.
Hadoop Streaming

Whereas Pipes is an API that provides close coupling between C++ application code and Hadoop, Streaming is a generic API that allows programs written in virtually any language to be used as Hadoop Mapper and Reducer implementations.

The official Hadoop documentation contains a thorough introduction to Streaming, and briefer notes on the wiki. A brief overview is presented here.

Hadoop Streaming allows you to use arbitrary programs for the Mapper and Reducer phases of a MapReduce job. Both Mappers and Reducers receive their input on stdin and emit output (key, value) pairs on stdout.

Input and output are always represented textually in Streaming. The input (key, value) pairs are written to stdin for a Mapper or Reducer, with a 'tab' character separating the key from the value. The Streaming programs should split the input on the first tab character on the line to recover the key and the value. Streaming programs write their output to stdout in the same format: key \t value \n.

The inputs to the reducer are sorted so that while each line contains only a single (key, value) pair, all the values for the same key are adjacent to one another.

Provided it can handle its input in the text format described above, any Linux program or tool can be used as the mapper or reducer in Streaming. You can also write your own scripts in bash, python, perl, or another language of your choice, provided that the necessary interpreter is present on all nodes in your cluster.

Running a Streaming Job: To run a job with Hadoop Streaming, use the following command:

$ bin/hadoop jar contrib/streaming/hadoop-version-streaming.jar


The command as shown, with no arguments, will print some usage information. An example of how to run real commands is given below:

$ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \
myMapProgram -reducer myReduceProgram -input /some/dfs/path \
-output /some/other/dfs/path

This assumes that myMapProgram and myReduceProgram are present on all nodes in the system ahead of time. If this is not the case, but they are present on the node launching the job, then they can be "shipped" to the other nodes with the -file option:

$ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \
myMapProgram -reducer myReduceProgram -file \
myMapProgram -file myReduceProgram -input some/dfs/path \
-output some/other/dfs/path

Any other support files necessary to run your program can be shipped in this manner as well.

More Tips

Chaining Jobs

Not every problem can be solved with a MapReduce program, but fewer still are those which can be solved with a single MapReduce job. Many problems can be solved with MapReduce, by writing several MapReduce steps which run in series to accomplish a goal:

Map1 -> Reduce1 -> Map2 -> Reduce2 -> Map3...

You can easily chain jobs together in this fashion by writing multiple driver methods, one for each job. Call the first driver method, which uses JobClient.runJob() to run the job and wait for it to complete. When that job has completed, then call the next driver method, which creates a new JobConf object referring to different instances of Mapper and Reducer, etc. The first job in the chain should write its output to a path which is then used as the input path for the second job. This process can be repeated for as many jobs are necessary to arrive at a complete solution to the problem.

Many problems which at first seem impossible in MapReduce can be accomplished by dividing one job into two or more.

Hadoop provides another mechanism for managing batches of jobs with dependencies between jobs. Rather than submit a JobConf to the JobClient's runJob() or submitJob() methods, org.apache.hadoop.mapred.jobcontrol.Job objects can be created to represent each job; A Job takes a JobConf object as its constructor argument. Jobs can depend on one another through the use of the addDependingJob() method. The code:

x.addDependingJob(y)

says that Job x cannot start until y has successfully completed. Dependency information cannot be added to a job after it has already been started. Given a set of jobs, these can be passed to an instance of the JobControl class. JobControl can receive individual jobs via the addJob() method, or a collection of jobs via addJobs(). The JobControl object will spawn a thread in the client to launch the jobs. Individual jobs will be launched when their dependencies have all successfully completed and when the MapReduce system as a whole has resources to execute the jobs. The JobControl interface allows you to query it to retrieve the state of individual jobs, as well as the list of jobs waiting, ready, running, and finished. The job submission process does not begin until the run() method of the JobControl object is called.
Troubleshooting: Debugging MapReduce

When writing MapReduce programs, you will occasionally encounter bugs in your programs, infinite loops, etc. This section describes the features of MapReduce that will help you diagnose and solve these conditions.

Log Files: Hadoop keeps logs of important events during program execution. By default, these are stored in the logs/ subdirectory of the hadoop-version/ directory where you run Hadoop from. Log files are named hadoop-username-service-hostname.log. The most recent data is in the .log file; older logs have their date appended to them. The username in the log filename refers to the username under which Hadoop was started -- this is not necessarily the same username you are using to run programs. The service name refers to which of the several Hadoop programs are writing the log; these can be jobtracker, namenode, datanode, secondarynamenode, or tasktracker. All of these are important for debugging a whole Hadoop installation. But for individual programs, the tasktracker logs will be the most relevant. Any exceptions thrown by your program will be recorded in the tasktracker logs.

The log directory will also have a subdirectory called userlogs. Here there is another subdirectory for every task run. Each task records its stdout and stderr to two files in this directory. Note that on a multi-node Hadoop cluster, these logs are not centrally aggregated -- you should check each TaskNode's logs/userlogs/ directory for their output.

Debugging in the distributed setting is complicated and requires logging into several machines to access log data. If possible, programs should be unit tested by running Hadoop locally. The default configuration deployed by Hadoop runs in "single instance" mode, where the entire MapReduce program is run in the same instance of Java as called JobClient.runJob(). Using a debugger like Eclipse, you can then set breakpoints inside the map() or reduce() methods to discover your bugs.

In Module 5, you will learn how to use additional features of MapReduce to distribute auxiliary code to nodes in the system. This can be used to enable debug scripts which run on machines when tasks fail.
Listing and Killing Jobs:

It is possible to submit jobs to a Hadoop cluster which malfunction and send themselves into infinite loops or other problematic states. In this case, you will want to manually kill the job you have started.

The following command, run in the Hadoop installation directory on a Hadoop cluster, will list all the current jobs:

$ bin/hadoop job -list

This will produce output that looks something like:

1 jobs currently running
JobId State StartTime UserName
job_200808111901_0001 1 1218506470390 aaron

You can use this job id to kill the job; the command is:

$ bin/hadoop job -kill jobid

Substitute the "job_2008..." from the -list command for jobid.

Checkpoint

You now know about all of the basic operations of the Hadoop MapReduce platform. Try the following exercise, to see if you understand the MapReduce programming concepts.

Exercise: Given the code for WordCount in listings 2 and 3, modify this code to produce an inverted index of its inputs. An inverted index returns a list of documents that contain each word in those documents. Thus, if the word "cat" appears in documents A and B, but not C, then the line:

cat A, B

should appear in the output. If the word "baseball" appears in documents B and C, then the line:

baseball B, C

should appear in the output as well.

If you get stuck, read the section on troubleshooting below. The working solution is provided at the end of this module.

Hint: The default InputFormat will provide the Mapper with (key, value) pairs where the key is the byte offset into the file, and the value is a line of text. To get the filename of the current input, use the following code:

FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();

Additional MapReduce Functionality



Combiner: The pipeline showed earlier omits a processing step which can be used for optimizing bandwidth usage by your MapReduce job. Called the Combiner, this pass runs after the Mapper and before the Reducer. Usage of the Combiner is optional. If this pass is suitable for your job, instances of the Combiner class are run on every node that has run map tasks. The Combiner will receive as input all data emitted by the Mapper instances on a given node. The output from the Combiner is then sent to the Reducers, instead of the output from the Mappers. The Combiner is a "mini-reduce" process which operates only on data generated by one machine.

Word count is a prime example for where a Combiner is useful. The Word Count program in listings 1--3 emits a (word, 1) pair for every instance of every word it sees. So if the same document contains the word "cat" 3 times, the pair ("cat", 1) is emitted three times; all of these are then sent to the Reducer. By using a Combiner, these can be condensed into a single ("cat", 3) pair to be sent to the Reducer. Now each node only sends a single value to the reducer for each word -- drastically reducing the total bandwidth required for the shuffle process, and speeding up the job. The best part of all is that we do not need to write any additional code to take advantage of this! If a reduce function is both commutative and associative, then it can be used as a Combiner as well. You can enable combining in the word count program by adding the following line to the driver:

conf.setCombinerClass(Reduce.class);

The Combiner should be an instance of the Reducer interface. If your Reducer itself cannot be used directly as a Combiner because of commutativity or associativity, you might still be able to write a third class to use as a Combiner for your job.
Fault Tolerance

One of the primary reasons to use Hadoop to run your jobs is due to its high degree of fault tolerance. Even when running jobs on a large cluster where individual nodes or network components may experience high rates of failure, Hadoop can guide jobs toward a successful completion.

The primary way that Hadoop achieves fault tolerance is through restarting tasks. Individual task nodes (TaskTrackers) are in constant communication with the head node of the system, called the JobTracker. If a TaskTracker fails to communicate with the JobTracker for a period of time (by default, 1 minute), the JobTracker will assume that the TaskTracker in question has crashed. The JobTracker knows which map and reduce tasks were assigned to each TaskTracker.

If the job is still in the mapping phase, then other TaskTrackers will be asked to re-execute all map tasks previously run by the failed TaskTracker. If the job is in the reducing phase, then other TaskTrackers will re-execute all reduce tasks that were in progress on the failed TaskTracker.

Reduce tasks, once completed, have been written back to HDFS. Thus, if a TaskTracker has already completed two out of three reduce tasks assigned to it, only the third task must be executed elsewhere. Map tasks are slightly more complicated: even if a node has completed ten map tasks, the reducers may not have all copied their inputs from the output of those map tasks. If a node has crashed, then its mapper outputs are inaccessible. So any already-completed map tasks must be re-executed to make their results available to the rest of the reducing machines. All of this is handled automatically by the Hadoop platform.

This fault tolerance underscores the need for program execution to be side-effect free. If Mappers and Reducers had individual identities and communicated with one another or the outside world, then restarting a task would require the other nodes to communicate with the new instances of the map and reduce tasks, and the re-executed tasks would need to reestablish their intermediate state. This process is notoriously complicated and error-prone in the general case. MapReduce simplifies this problem drastically by eliminating task identities or the ability for task partitions to communicate with one another. An individual task sees only its own direct inputs and knows only its own outputs, to make this failure and restart process clean and dependable.

Speculative execution: One problem with the Hadoop system is that by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program. For example if one node has a slow disk controller, then it may be reading its input at only 10% the speed of all the other nodes. So when 99 map tasks are already complete, the system is still waiting for the final map task to check in, which takes much longer than all the other nodes.

By forcing tasks to run in isolation from one another, individual tasks do not know where their inputs come from. Tasks trust the Hadoop platform to just deliver the appropriate input. Therefore, the same input can be processed multiple times in parallel, to exploit differences in machine capabilities. As most of the tasks in a job are coming to a close, the Hadoop platform will schedule redundant copies of the remaining tasks across several nodes which do not have other work to perform. This process is known as speculative execution. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

Speculative execution is enabled by default. You can disable speculative execution for the mappers and reducers by setting the mapred.map.tasks.speculative.execution and mapred.reduce.tasks.speculative.execution JobConf options to false, respectively.

MapReduce Data Flow

If have gone through our previous postthe then we have seen the components that make up a basic MapReduce job, we can see how everything works together at a higher level:


MapReduce inputs typically come from input files loaded onto our processing cluster in HDFS. These files are evenly distributed across all our nodes. Running a MapReduce program involves running mapping tasks on many or all of the nodes in our cluster. Each of these mapping tasks is equivalent: no mappers have particular "identities" associated with them. Therefore, any mapper can process any input file. Each mapper loads the set of files local to that machine and processes them.

When the mapping phase has completed, the intermediate (key, value) pairs must be exchanged between machines to send all values with the same key to a single reducer. The reduce tasks are spread across the same nodes in the cluster as the mappers. This is the only communication step in MapReduce. Individual map tasks do not exchange information with one another, nor are they aware of one another's existence. Similarly, different reduce tasks do not communicate with one another. The user never explicitly marshals information from one machine to another; all data transfer is handled by the Hadoop MapReduce platform itself, guided implicitly by the different keys associated with values. This is a fundamental element of Hadoop MapReduce's reliability. If nodes in the cluster fail, tasks must be able to be restarted. If they have been performing side-effects, e.g., communicating with the outside world, then the shared state must be restored in a restarted task. By eliminating communication and side-effects, restarts can be handled more gracefully.



A Closer Look

The previous figure described the high-level view of Hadoop MapReduce. From this diagram, you can see where the mapper and reducer components of the Word Count application fit in, and how it achieves its objective. We will now examine this system in a bit closer detail.


shows the pipeline with more of its mechanics exposed. While only two nodes are depicted, the same pipeline can be replicated across a very large number of nodes. The next several paragraphs describe each of the stages of a MapReduce program more precisely.

Input files: This is where the data for a MapReduce task is initially stored. While this does not need to be the case, the input files typically reside in HDFS. The format of these files is arbitrary; while line-based log files can be used, we could also use a binary format, multi-line input records, or something else entirely. It is typical for these input files to be very large -- tens of gigabytes or more.

InputFormat: How these input files are split up and read is defined by the InputFormat. An InputFormat is a class that provides the following functionality:

* Selects the files or other objects that should be used for input
* Defines the InputSplits that break a file into tasks
* Provides a factory for RecordReader objects that read the file

Several InputFormats are provided with Hadoop. An abstract type is called FileInputFormat; all InputFormats that operate on files inherit functionality and properties from this class. When starting a Hadoop job, FileInputFormat is provided with a path containing files to read. The FileInputFormat will read all files in this directory. It then divides these files into one or more InputSplits each. You can choose which InputFormat to apply to your input files for a job by calling the setInputFormat() method of the JobConf object that defines the job. A table of standard InputFormats is given below.




The default InputFormat is the TextInputFormat. This treats each line of each input file as a separate record, and performs no parsing. This is useful for unformatted data or line-based records like log files. A more interesting input format is the KeyValueInputFormat. This format also treats each line of input as a separate record. While the TextInputFormat treats the entire line as the value, the KeyValueInputFormat breaks the line itself into the key and value by searching for a tab character. This is particularly useful for reading the output of one MapReduce job as the input to another, as the default OutputFormat (described in more detail below) formats its results in this manner. Finally, the SequenceFileInputFormat reads special binary files that are specific to Hadoop. These files include many features designed to allow data to be rapidly read into Hadoop mappers. Sequence files are block-compressed and provide direct serialization and deserialization of several arbitrary data types (not just text). Sequence files can be generated as the output of other MapReduce tasks and are an efficient intermediate representation for data that is passing from one MapReduce job to anther.

InputSplits: An InputSplit describes a unit of work that comprises a single map task in a MapReduce program. A MapReduce program applied to a data set, collectively referred to as a Job, is made up of several (possibly several hundred) tasks. Map tasks may involve reading a whole file; they often involve reading only part of a file. By default, the FileInputFormat and its descendants break a file up into 64 MB chunks (the same size as blocks in HDFS). You can control this value by setting the mapred.min.split.size parameter in hadoop-site.xml, or by overriding the parameter in the JobConf object used to submit a particular MapReduce job. By processing a file in chunks, we allow several map tasks to operate on a single file in parallel. If the file is very large, this can improve performance significantly through parallelism. Even more importantly, since the various blocks that make up the file may be spread across several different nodes in the cluster, it allows tasks to be scheduled on each of these different nodes; the individual blocks are thus all processed locally, instead of needing to be transferred from one node to another. Of course, while log files can be processed in this piece-wise fashion, some file formats are not amenable to chunked processing. By writing a custom InputFormat, you can control how the file is broken up (or is not broken up) into splits. Custom input formats are described in Module 5.

The InputFormat defines the list of tasks that make up the mapping phase; each task corresponds to a single input split. The tasks are then assigned to the nodes in the system based on where the input file chunks are physically resident. An individual node may have several dozen tasks assigned to it. The node will begin working on the tasks, attempting to perform as many in parallel as it can. The on-node parallelism is controlled by the mapred.tasktracker.map.tasks.maximum parameter.

RecordReader: The InputSplit has defined a slice of work, but does not describe how to access it. The RecordReader class actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper. The RecordReader instance is defined by the InputFormat. The default InputFormat, TextInputFormat, provides a LineRecordReader, which treats each line of the input file as a new value. The key associated with each line is its byte offset in the file. The RecordReader is invoke repeatedly on the input until the entire InputSplit has been consumed. Each invocation of the RecordReader leads to another call to the map() method of the Mapper.

Mapper: The Mapper performs the interesting user-defined work of the first phase of the MapReduce program. Given a key and a value, the map() method emits (key, value) pair(s) which are forwarded to the Reducers. A new instance of Mapper is instantiated in a separate Java process for each map task (InputSplit) that makes up part of the total job input. The individual mappers are intentionally not provided with a mechanism to communicate with one another in any way. This allows the reliability of each map task to be governed solely by the reliability of the local machine. The map() method receives two parameters in addition to the key and the value:

* The OutputCollector object has a method named collect() which will forward a (key, value) pair to the reduce phase of the job.
* The Reporter object provides information about the current task; its getInputSplit() method will return an object describing the current InputSplit. It also allows the map task to provide additional information about its progress to the rest of the system. The setStatus() method allows you to emit a status message back to the user. The incrCounter() method allows you to increment shared performance counters. You may define as many arbitrary counters as you wish. Each mapper can increment the counters, and the JobTracker will collect the increments made by the different processes and aggregate them for later retrieval when the job ends.

Partition & Shuffle: After the first map tasks have completed, the nodes may still be performing several more map tasks each. But they also begin exchanging the intermediate outputs from the map tasks to where they are required by the reducers. This process of moving map outputs to the reducers is known as shuffling. A different subset of the intermediate key space is assigned to each reduce node; these subsets (known as "partitions") are the inputs to the reduce tasks. Each map task may emit (key, value) pairs to any partition; all values for the same key are always reduced together regardless of which mapper is its origin. Therefore, the map nodes must all agree on where to send the different pieces of the intermediate data. The Partitioner class determines which partition a given (key, value) pair will go to. The default partitioner computes a hash value for the key and assigns the partition based on this result. Custom partitioners are described in more detail in Module 5.

Sort: Each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted by Hadoop before they are presented to the Reducer.

Reduce: A Reducer instance is created for each reduce task. This is an instance of user-provided code that performs the second important phase of job-specific work. For each key in the partition assigned to a Reducer, the Reducer's reduce() method is called once. This receives a key as well as an iterator over all the values associated with the key. The values associated with a key are returned by the iterator in an undefined order. The Reducer also receives as parameters OutputCollector and Reporter objects; they are used in the same manner as in the map() method.

OutputFormat: The (key, value) pairs provided to this OutputCollector are then written to output files. The way they are written is governed by the OutputFormat. The OutputFormat functions much like the InputFormat class described earlier. The instances of OutputFormat provided by Hadoop write to files on the local disk or in HDFS; they all inherit from a common FileOutputFormat. Each Reducer writes a separate file in a common output directory. These files will typically be named part-nnnnn, where nnnnn is the partition id associated with the reduce task. The output directory is set by the FileOutputFormat.setOutputPath() method. You can control which particular OutputFormat is used by calling the setOutputFormat() method of the JobConf object that defines your MapReduce job.

Hadoop provides some OutputFormat instances to write to files. The basic (default) instance is TextOutputFormat, which writes (key, value) pairs on individual lines of a text file. This can be easily re-read by a later MapReduce task using the KeyValueInputFormat class, and is also human-readable. A better intermediate format for use between MapReduce jobs is the SequenceFileOutputFormat which rapidly serializes arbitrary data types to the file; the corresponding SequenceFileInputFormat will deserialize the file into the same types and presents the data to the next Mapper in the same manner as it was emitted by the previous Reducer. The NullOutputFormat generates no output files and disregards any (key, value) pairs passed to it by the OutputCollector. This is useful if you are explicitly writing your own output files in the reduce() method, and do not want additional empty output files generated by the Hadoop framework.

RecordWriter: Much like how the InputFormat actually reads individual records through the RecordReader implementation, the OutputFormat class is a factory for RecordWriter objects; these are used to write the individual records to the files as directed by the OutputFormat.

The output files written by the Reducers are then left in HDFS for your use, either by another MapReduce job, a separate program, for for human inspection.

MapReduce Basics

Functional Programming Concepts

MapReduce programs are designed to compute large volumes of data in a parallel fashion. This requires dividing the workload across a large number of machines. This model would not scale to large clusters (hundreds or thousands of nodes) if the components were allowed to share data arbitrarily. The communication overhead required to keep the data on the nodes synchronized at all times would prevent the system from performing reliably or efficiently at large scale.

Instead, all data elements in MapReduce are immutable, meaning that they cannot be updated. If in a mapping task you change an input (key, value) pair, it does not get reflected back in the input files; communication occurs only by generating new output (key, value) pairs which are then forwarded by the Hadoop system into the next phase of execution.

List Processing

Conceptually, MapReduce programs transform lists of input data elements into lists of output data elements. A MapReduce program will do this twice, using two different list processing idioms: map, and reduce. These terms are taken from several list processing languages such as LISP, Scheme, or ML.

Mapping Lists

The first phase of a MapReduce program is called mapping. A list of data elements are provided, one at a time, to a function called the Mapper, which transforms each element individually to an output data element.


As an example of the utility of map: Suppose you had a function toUpper(str) which returns an uppercase version of its input string. You could use this function with map to turn a list of strings into a list of uppercase strings. Note that we are not modifying the input string here: we are returning a new string that will form part of a new output list.

Reducing Lists

Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.



Reducing is often used to produce "summary" data, turning a large volume of data into a smaller summary of itself. For example, "+" can be used as a reducing function, to return the sum of a list of input values.

Putting Them Together in MapReduce:

The Hadoop MapReduce framework takes these concepts and uses them to process large volumes of information. A MapReduce program has two components: one that implements the mapper, and another that implements the reducer. The Mapper and Reducer idioms described above are extended slightly to work in this environment, but the basic principles are the same.

Keys and values: In MapReduce, no value stands on its own. Every value has a key associated with it. Keys identify related values. For example, a log of time-coded speedometer readings from multiple cars could be keyed by license-plate number; it would look like:



AAA-123 65mph, 12:00pm
ZZZ-789 50mph, 12:02pm
AAA-123 40mph, 12:05pm
CCC-456 25mph, 12:15pm
...

The mapping and reducing functions receive not just values, but (key, value) pairs. The output of each of these functions is the same: both a key and a value must be emitted to the next list in the data flow.

MapReduce is also less strict than other languages about how the Mapper and Reducer work. In more formal functional mapping and reducing settings, a mapper must produce exactly one output element for each input element, and a reducer must produce exactly one output element for each input list. In MapReduce, an arbitrary number of values can be output from each phase; a mapper may map one input into zero, one, or one hundred outputs. A reducer may compute over an input list and emit one or a dozen different outputs.

Keys divide the reduce space: A reducing function turns a large list of values into one (or a few) output values. In MapReduce, all of the output values are not usually reduced together. All of the values with the same key are presented to a single reducer together. This is performed independently of any reduce operations occurring on other lists of values, with different keys attached.



An Example Application: Word Count

A simple MapReduce program can be written to determine how many times different words appear in a set of files. For example, if we had the files:

foo.txt: Sweet, this is the foo file

bar.txt: This is the bar file

We would expect the output to be:

sweet 1
this 2
is 2
the 2
foo 1
bar 1
file 2

Naturally, we can write a program in MapReduce to compute this output. The high-level structure would look like this:

mapper (filename, file-contents):
for each word in file-contents:
emit (word, 1)

reducer (word, values):
sum = 0
for each value in values:
sum = sum + value
emit (word, sum)

Listing 4.1: High-Level MapReduce Word Count

Several instances of the mapper function are created on the different machines in our cluster. Each instance receives a different input file (it is assumed that we have many such files). The mappers output (word, 1) pairs which are then forwarded to the reducers. Several instances of the reducer method are also instantiated on the different machines. Each reducer is responsible for processing the list of values associated with a different word. The list of values will be a list of 1's; the reducer sums up those ones into a final count associated with a single word. The reducer then emits the final (word, count) output which is written to an output file.

We can write a very similar program to this in Hadoop MapReduce; it is included in the Hadoop distribution in src/examples/org/apache/hadoop/examples/WordCount.java. It is partially reproduced below:

public static class MapClass extends MapReduceBase
implements Mapper {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value,
OutputCollector output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}

/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce extends MapReduceBase
implements Reducer {

public void reduce(Text key, Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

Listing 4.2: Hadoop MapReduce Word Count Source

There are some minor differences between this actual Java implementation and the pseudo-code shown above. First, Java has no native emit keyword; the OutputCollector object you are given as an input will receive values to emit to the next stage of execution. And second, the default input format used by Hadoop presents each line of an input file as a separate input to the mapper function, not the entire file at a time. It also uses a StringTokenizer object to break up the line into words. This does not perform any normalization of the input, so "cat", "Cat" and "cat," are all regarded as different strings. Note that the class-variable word is reused each time the mapper outputs another (word, 1) pairing; this saves time by not allocating a new variable for each output. The output.collect() method will copy the values it receives as input, so you are free to overwrite the variables you use.


The Driver Method

There is one final component of a Hadoop MapReduce program, called the Driver. The driver initializes the job and instructs the Hadoop platform to execute your code on a set of input files, and controls where the output files are placed. A cleaned-up version of the driver from the example Java implementation that comes with Hadoop is presented below:

public void run(String inputPath, String outputPath) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);

FileInputFormat.addInputPath(conf, new Path(inputPath));
FileOutputFormat.setOutputPath(conf, new Path(outputPath));

JobClient.runJob(conf);
}

Listing 4.3: Hadoop MapReduce Word Count Driver

This method sets up a job to execute the word count program across all the files in a given input directory (the inputPath argument). The output from the reducers are written into files in the directory identified by outputPath. The configuration information to run the job is captured in the JobConf object. The mapping and reducing functions are identified by the setMapperClass() and setReducerClass() methods. The data types emitted by the reducer are identified by setOutputKeyClass() and setOutputValueClass(). By default, it is assumed that these are the output types of the mapper as well. If this is not the case, the methods setMapOutputKeyClass() and setMapOutputValueClass() methods of the JobConf class will override these. The input types fed to the mapper are controlled by the InputFormat used. Input formats are discussed in more detail below. The default input format, "TextInputFormat," will load data in as (LongWritable, Text) pairs. The long value is the byte offset of the line in the file. The Text object holds the string contents of the line of the file.

The call to JobClient.runJob(conf) will submit the job to MapReduce. This call will block until the job completes. If the job fails, it will throw an IOException. JobClient also provides a non-blocking version called submitJob().

MapReduce Introduction

MapReduce is a programming model designed for processing large volumes of data in parallel by dividing the work into a set of independent tasks. MapReduce programs are written in a particular style influenced by functional programming constructs, specifically idioms for processing lists of data.

This blog explains the nature of this programming model and how it can be used to write programs which run in the Hadoop environment.