The Reporter object passed in to your Mapper and Reducer classes can be used to update counters. The same set of counter variables can be contributed to by all Mapper and Reducer instances across your cluster. The values are aggregated by the master node of the cluster, so they are "thread-safe" in this manner.
Counters are incremented through the Reporter.incrCounter() method. The names of the counters are defined as Java enum's. The following example demonstrates how to count the number of "A" vs. "B" records seen by the mapper:
public class MyMapper extends MapReduceBase implements
Mapper
static enum RecordCounters { TYPE_A, TYPE_B, TYPE_UNKNOWN };
// actual definitions elided
public boolean isTypeARecord(Text input) { ... }
public boolean isTypeBRecord(Text input) { ... }
public void map(Text key, Text val, OutputCollector
Reporter reporter) throws IOException {
if (isTypeARecord(key)) {
reporter.incrCounter(RecordCounters.TYPE_A, 1);
} else if (isTypeBRecord(key)) {
reporter.incrCounter(RecordCounters.TYPE_B, 1);
} else {
reporter.incrCounter(RecordCounters.TYPE_UNKNOWN, 1);
}
// actually process the record here, call
// output.collect( .. ), etc.
}
}
If you launch your job with JobClient.runJob(), the diagnostic information printed to stdout when the job completes will contain the values of all the counters. Both runJob() and submitJob() will return a RunningJob object that refers to the job in question. The RunningJob.getCounters() method will return a Counters object that contains the values of all the counters so that you can query them programmatically. The Counters.getCounter(Enum key) method returns the value of a particular counter.