Hadoop will generate a large number of log files for a job, distributed across all the nodes that participated in the job's execution. Often times only a subset of these logs will be of interest when debugging failing tasks. MapReduce can help with this by running a user-provided script when either a map or reduce task fails. These scripts are provided the names of files containing the stdout and stderr from the task, as well as the task's Hadoop log and job.xml file (i.e., its complete JobConf in serialized form).
These scripts will be run on whichever node encounters failing tasks. You can use these scripts to perform automation to allow you to more easily inspect only the failing tasks: e.g., email the stdout/stderr to an administrator email address; upload the failed task's log files to a common NFS-mounted "debug dump" directory, preserve local state modifications made by map tasks, etc.
Separate scripts can be provided for map and reduce task failure. They each receive as arguments, in order, the names of files containing the task's stdout, stderr, syslog, and jobconf. Because they are run on all the task nodes, and not on the client machine where the job was submitted, these scripts must be sent to the nodes through the distributed cache listed above.
The following method will enable failed task scripts on a MapReduce job being prepared. It assumes that you have given it the names of two scripts (e.g., bash scripts) which do your debug actions with the log filenames provided (e.g., copy them to a shared NFS mount). In this script these are /home/aaron/src/map-fail and reduce-fail.
private static final String FAILED_MAP_SCRIPT_NAME = "map-fail";
private static final String FAILED_REDUCE_SCRIPT_NAME = "reduce-fail";
private static final String HDFS_SCRIPT_DIR = "/debug";
private static final String HDFS_FAILED_MAP_SCRIPT =
HDFS_SCRIPT_DIR + "/" + FAILED_MAP_SCRIPT_NAME;
private static final String HDFS_FAILED_REDUCE_SCRIPT =
HDFS_SCRIPT_DIR + "/" + FAILED_REDUCE_SCRIPT_NAME;
private static final String LOCAL_FAILED_MAP_SCRIPT =
"/home/aaron/src/" + FAILED_MAP_SCRIPT_NAME;
private static final String LOCAL_FAILED_REDUCE_SCRIPT =
"/home/aaron/src/" + FAILED_REDUCE_SCRIPT_NAME;
public static void setupFailedTaskScript(JobConf conf) throws IOException {
// create a directory on HDFS where we'll upload the fail scripts
FileSystem fs = FileSystem.get(conf);
Path debugDir = new Path(HDFS_SCRIPT_DIR);
// who knows what's already in this directory; let's just clear it.
if (fs.exists(debugDir)) {
fs.delete(debugDir, true);
}
// ...and then make sure it exists again
fs.mkdirs(debugDir);
// upload the local scripts into HDFS
fs.copyFromLocalFile(new Path(LOCAL_FAILED_MAP_SCRIPT),
new Path(HDFS_FAILED_MAP_SCRIPT));
fs.copyFromLocalFile(new Path(LOCAL_FAILED_REDUCE_SCRIPT),
new Path(HDFS_FAILED_REDUCE_SCRIPT));
conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
DistributedCache.createSymlink(conf);
URI fsUri = fs.getUri();
String mapUriStr = fsUri.toString() + HDFS_FAILED_MAP_SCRIPT
+ "#" + FAILED_MAP_SCRIPT_NAME;
URI mapUri = null;
try {
mapUri = new URI(mapUriStr);
} catch (URISyntaxException use) {
throw new IOException(use);
}
DistributedCache.addCacheFile(mapUri, conf);
String reduceUriStr = fsUri.toString() + HDFS_FAILED_REDUCE_SCRIPT
+ "#" + FAILED_REDUCE_SCRIPT_NAME;
URI reduceUri = null;
try {
reduceUri = new URI(reduceUriStr);
} catch (URISyntaxException use) {
throw new IOException(use);
}
DistributedCache.addCacheFile(reduceUri, conf);
}
How does this all work? The scripts are, presumably, initially hosted on the client machine that is submitting the job. The client is responsible for injecting them into HDFS and enabling them in the distributed cache. It first creates the HDFS_SCRIPT_DIR and then uploads the local script files into this directory.
It must then set the commands for the TaskTracker to execute to run the scripts. This is accomplished by the lines:
conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
DistributedCache.createSymlink(conf);
The distributed cache copies the files to the mapred.local.dir on each task node. The TaskTracker will then execute the scripts if necessary. But the TaskTracker does not run with its working directory set to mapred.local.dir. Fortunately, the distributed cache can be told to create symlinks in the working directory for files in the distributed cache. The third line of the snippit above enables this functionality. Now ./FAILED_MAP_SCRIPT_NAME will point to the copy of FAILED_MAP_SCRIPT_NAME in the local cache directory, and the script can be run.
But before that can happen, we must add the files themselves to the distributed cache. (As of now they are only in HDFS.) Ordinarily, we could just call DistributedCache.addCacheFile(new Path("hdfs_path_to_some_file").toUri()) on a filename and that would be sufficient. But since we need to create symlinks, we must provide the distributed cache with information as to how the symlink should be created--what filename it should take in the working directory. This is provided as the URI "anchor" part following the "#" in the URI. A subtlety of Hadoop's Path class is that if you put a '#' in the path string, it will URL-encode it and treat it as part of the filename. Therefore, we use some extra code to construct our URIs manually to ensure that the '#' remains unescaped.