Big Data Analytics

Additional Language Support

Hadoop itself is written in Java; it thus accepts Java code natively for Mappers and Reducers. Hadoop also comes with two adapter layers which allow code written in other languages to be used in MapReduce programs.

Pipes is a library which allows C++ source code to be used for Mapper and Reducer code. Applications which require high numerical performance may see better throughput if written in C++ and used through Pipes. This library is supported on 32-bit Linux installations.

The include files and static libraries are present in the c++/Linux-i386-32/ directory under your Hadoop installation. Your application should include include/hadoop/Pipes.hh and TemplateFactory.hh and link against lib/libhadooppies.a; with gcc, include the arguments -L${HADOOP_HOME}/c++/Linux-i386-32/lib -lhadooppipes to do the latter.

Both key and value inputs to pipes programs are provided as STL strings (std::string). A program must still define an instance of Mapper and Reducer; these names have not changed. (They, like all other classes defined in Pipes, are in the HadoopPipes namespace.) Unlike the classes of the same names in Hadoop itself, the map() and reduce() functions take in a single argument which is a reference to an object of type MapContext and ReduceContext respectively. The most important methods contained in each of these context objects are:

const std::string& getInputKey();
const std::string& getInputValue();
void emit(const std::string& key, const std::string& value);

The ReduceContext class also contains an additional method to advance the value iterator:

bool nextValue();

Defining a Pipes Program: A program to use with Pipes is defined by writing classes extending Mapper and Reducer. (And optionally, Partitioner; see Module 5.) Hadoop must then be informed which classes to use to run the job.

An instance of your C++ program will be started by the Pipes framework in main() on each machine. This should do any (hopefully brief) configuration required for your task. It should then define a Factory to create Mapper and Reducer instances as necessary, and then run the job by calling the runTask() method. The simplest way to define a factory is with the following code:

using namespace HadoopPipes;

void main() {
// classes are indicated to the factory via templates
// TODO: Substitute your own class names in below.
TemplateFactory2 factory();

// do any configuration you need to do here

// start the task
bool result = runTask(factory);

Running a Pipes Program: After a Pipes program has been written and compiled, it can be launched as a job with the following command: (Do this in your Hadoop home directory)

$ bin/hadoop pipes -input inputPath -output outputPath -program path/to/pipes/program/executable

This will deploy your Pipes program on all nodes and run the MapReduce job through it. By running bin/hadoop pipes with no options, you can see additional usage information which describes how to set additional configuration values as necessary.

The Pipes API contains additional functionality to allow you to read settings from the JobConf, override the Partitioner class, and use RecordReaders in a more direct fashion for higher performance. See the header files in c++/Linux-i386-32/include/hadoop for more information.
Hadoop Streaming

Whereas Pipes is an API that provides close coupling between C++ application code and Hadoop, Streaming is a generic API that allows programs written in virtually any language to be used as Hadoop Mapper and Reducer implementations.

The official Hadoop documentation contains a thorough introduction to Streaming, and briefer notes on the wiki. A brief overview is presented here.

Hadoop Streaming allows you to use arbitrary programs for the Mapper and Reducer phases of a MapReduce job. Both Mappers and Reducers receive their input on stdin and emit output (key, value) pairs on stdout.

Input and output are always represented textually in Streaming. The input (key, value) pairs are written to stdin for a Mapper or Reducer, with a 'tab' character separating the key from the value. The Streaming programs should split the input on the first tab character on the line to recover the key and the value. Streaming programs write their output to stdout in the same format: key \t value \n.

The inputs to the reducer are sorted so that while each line contains only a single (key, value) pair, all the values for the same key are adjacent to one another.

Provided it can handle its input in the text format described above, any Linux program or tool can be used as the mapper or reducer in Streaming. You can also write your own scripts in bash, python, perl, or another language of your choice, provided that the necessary interpreter is present on all nodes in your cluster.

Running a Streaming Job: To run a job with Hadoop Streaming, use the following command:

$ bin/hadoop jar contrib/streaming/hadoop-version-streaming.jar

The command as shown, with no arguments, will print some usage information. An example of how to run real commands is given below:

$ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \
myMapProgram -reducer myReduceProgram -input /some/dfs/path \
-output /some/other/dfs/path

This assumes that myMapProgram and myReduceProgram are present on all nodes in the system ahead of time. If this is not the case, but they are present on the node launching the job, then they can be "shipped" to the other nodes with the -file option:

$ bin/hadoop jar contrib/streaming-hadoop-0.18.0-streaming.jar -mapper \
myMapProgram -reducer myReduceProgram -file \
myMapProgram -file myReduceProgram -input some/dfs/path \
-output some/other/dfs/path

Any other support files necessary to run your program can be shipped in this manner as well.