Big Data Analytics

Partitioning Data

"Partitioning" is the process of determining which reducer instance will receive which intermediate keys and values. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same. If the key "cat" is generated in two separate (key, value) pairs, they must both be reduced together. It is also important for performance reasons that the mappers be able to partition data independently -- they should never need to exchange information with one another to determine the partition for a particular key.

Hadoop uses an interface called Partitioner to determine which partition a (key, value) pair will go to. A single partition refers to all (key, value) pairs which will be sent to a single reduce task. Hadoop MapReduce determines when the job starts how many partitions it will divide the data into. If twenty reduce tasks are to be run (controlled by the JobConf.setNumReduceTasks()) method), then twenty partitions must be filled.

The Partitioner defines one method which must be filled:

public interface Partitioner extends JobConfigurable {
int getPartition(K key, V value, int numPartitions);
}

The getPartition() method receives a key and a value and the number of partitions to split the data across; a number in the range [0, numPartitions) must be returned by this method, indicating which partition to send the key and value to. For any two keys k1 and k2, k1.equals(k2) implies getPartition(k1, *, n) == getPartition(k2, *, n).

The default Partitioner implementation is called HashPartitioner. It uses the hashCode() method of the key objects modulo the number of partitions total to determine which partition to send a given (key, value) pair to.

For most randomly-distributed data, this should result in all partitions being of roughly equal size. If the data in your data set is skewed in some way, however, this might not produce good results. For example, if you know that the key 0 will appear much more frequently than any other key, then you may want to send all the 0-keyed data to one partition, and distribute the other keys over all other partitions by their hashCode(). Also, if the hashCode() method for your data type does not provide uniformly-distributed values over its range, then data may not be sent to reducers evenly. Poor partitioning of data means that some reducers will have more data input than others, which usually means they'll have more work to do than the other reducers. Thus the entire job will wait for one reducer to finish its extra-large share of the load, when it might have been possible to spread that across many different reducers.

If you are dissatisfied with the performance of HashPartitioner, you are free to write your own Partitioner implementation. It can be general-purpose, or tailored to the specific data types or values that you expect to use in your application. After implementing the Partitioner interface, use the JobConf.setPartitionerClass() method to tell Hadoop to use it for your job.