Big Data Analytics

Custom Data Types

Hadoop MapReduce uses typed data at all times when it interacts with user-provided Mappers and Reducers: data read from files into Mappers, emitted by mappers to reducers, and emitted by reducers into output files is all stored in Java objects.
Writable Types

Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. Hadoop provides several stock classes which implement Writable: Text (which stores String data), IntWritable, LongWritable, FloatWritable, BooleanWritable, and several others. The entire list is in the org.apache.hadoop.io package of the Hadoop source (see the API reference).

In addition to these types, you are free to define your own classes which implement Writable. You can organize a structure of virtually any layout to fit your data and be transmitted by Hadoop. As a motivating example, consider a mapper which emits key-value pairs where the key is the name of an object, and the value is its coordinates in some 3-dimensional space. The key is some string-based data, and the value is a structure of the form:

struct point3d {
float x;
float y;
float z;
}

The key can be represented as a Text object, but what about the value? How do we build a Point3D class which Hadoop can transmit? The answer is to implement the Writable interface, which requires two methods:

public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
}

The first of these methods initializes all of the fields of the object based on data contained in the binary stream in. The latter writes all the information needed to reconstruct the object to the binary stream out. The DataInput and DataOutput classes (part of java.io) contain methods to serialize most basic types of data; the important contract between your readFields() and write() methods is that they read and write the data from and to the binary stream in the same order. The following code implements a Point3D class usable by Hadoop:

public class Point3D implements Writable {
public float x;
public float y;
public float z;

public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}

public Point3D() {
this(0.0f, 0.0f, 0.0f);
}

public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}

public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}

public String toString() {
return Float.toString(x) + ", "
+ Float.toString(y) + ", "
+ Float.toString(z);
}
}

Listing 5.1: A Point class which implements Writable
Custom Key Types

As written, the Point3D type will work as a value type like we require for the mapper problem described above. But what if we want to emit Point3D objects as keys too? In Hadoop MapReduce, if (key, value) pairs sent to a single reduce task include multiple keys, the reducer will process the keys in sorted order. So key types must implement a stricter interface, WritableComparable. In addition to being Writable so they can be transmitted over the network, they also obey Java's Comparable interface. The following code listing extends Point3D to meet this interface:

public class Point3D implements WritableComparable {
public float x;
public float y;
public float z;

public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}

public Point3D() {
this(0.0f, 0.0f, 0.0f);
}

public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}

public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}

public String toString() {
return Float.toString(x) + ", "
+ Float.toString(y) + ", "
+ Float.toString(z);
}

/** return the Euclidean distance from (0, 0, 0) */
public float distanceFromOrigin() {
return (float)Math.sqrt(x*x + y*y + z*z);
}

public int compareTo(Point3D other) {
float myDistance = distanceFromOrigin();
float otherDistance = other.distanceFromOrigin();

return Float.compare(myDistance, otherDistance);
}

public boolean equals(Object o) {
if (!(other instanceof Point3D)) {
return false;
}

Point3D other = (Point3D)o;
return this.x == other.x && this.y == other.y
&& this.z == other.z;
}

public int hashCode() {
return Float.floatToIntBits(x)
^ Float.floatToIntBits(y)
^ Float.floatToIntBits(z);
}
}

Listing 5.2: A WritableComparable version of Point3D

It is important for key types to implement hashCode() as well; the section on Partitioners later in this module explains why. The methods hashCode() and equals() have been provided in this version of the class as well.
Using Custom Types

Now that you have created a custom data type, Hadoop must be told to use it. You can control the output key or value data type for a job by using the setOutputKeyClass() and setOutputValueClass() methods of the JobConf object that defines your job. By default, this will set the types expected as output from both the map and reduce phases. If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf's setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer. The types delivered as input to the Mapper are governed by the InputFormat used; see the next section of this module for more details.
Faster Comparison Operations

The default sorting process for keys will read instances of the key type in from a stream, parsing the byte stream with the readFields() method of the key class, and then call the compareTo() method of the key class on the two objects. For faster performance, it may be possible to decide on an ordering between two keys just by looking at the byte streams and without parsing all of the data contained therein. For example, consider comparing strings of text. If characters are read in sequentially, then a decision can be made on their ordering as soon as a character position is found where the two strings differ. Even if all of the bytes for the object must be read in, the object itself does not necessarily need to be instantiated around those bytes. To support this higher-speed sorting mechanism, you can extend the WritableComparator class with a comparator specific to your own data type. In particular, the method which should be overridden is

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)

The default implementation is in the class org.apache.hadoop.io.WritableComparator. The relevant method has been reproduced here:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);

buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);

} catch (IOException e) {
throw new RuntimeException(e);
}

return compare(key1, key2); // compare them
}

Its operation is exactly as described above; it performs the straightforward comparison of the two objects after they have been individually deserialized from their separate byte streams (the b variables), which each have their own start offset (s) and length (l) attributes. Both objects must be fully constructed and deserialized before comparison can occur. The Text class, on the other hand, allows incremental comparison via its own implementation of this method. The code from org.apache.hadoop.io.Text is shown here:

/** A WritableComparator optimized for Text keys. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}

public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}
}

The Text object is serialized by first writing its length field to the byte stream, followed by the UTF-encoded string. The method decodeVIntSize determines the length of the integer describing the length of the byte stream. The comparator then skips these bytes, directly comparing the UTF-encoded bytes of the actual string-portion of the stream in the compareBytes() method. As soon as it finds a character in which the two streams differ, it returns a result without examining the rest of the strings.

Note that you do not need to manually specify this comparator's use in your Hadoop programs. Hadoop automatically uses this special comparator implementation for Text data due to the following code being added to Text's static initialization:

static {
// register this comparator
WritableComparator.define(Text.class, new Comparator());
}

Final Writable Notes

Defining custom writable types allows you to intelligently use Hadoop to manipulate higher-level data structures, without needing to use toString() to convert all your data types to text for sending over the network. If you will be using a type in a lot of MapReduce jobs, or you must process a very large volume of them (as is usually the case in Hadoop), defining your own data type classes will provide a significant performance benefit.

Exercise: Assume that we have a mapper which emits line segments as keys and values. A line segment is defined by its endpoints. For our purposes, line segments can be ordered by their lengths. Implement a LineSegment class which implements WritableComparable. Hint: make use of Point3D objects.