/** jmt12 **/ package org.nzdl.gsdl; import org.nzdl.gsdl.GSGroupingComparator; import org.nzdl.gsdl.GSInfoDB; import org.nzdl.gsdl.GSPartitioner; import java.io.*; import java.lang.Iterable; import java.lang.ProcessBuilder; import java.lang.ProcessBuilder.*; import java.lang.Thread; import java.net.InetAddress; import java.nio.channels.FileChannel; import java.util.Iterator; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.*; /** @class WordCount */ public class HadoopGreenstoneIngest2 { /** @class GSFileRecordReader */ public static class GSFileRecordReader extends RecordReader { /** Uncompressed file name */ private Text current_key; private Text current_value = new Text(""); /** Used to indicate progress */ private boolean is_finished = false; /** */ @Override public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext ) throws IOException, InterruptedException { FileSplit split = (FileSplit) inputSplit; current_key = new Text(split.getPath().toString()); } /** initialize() **/ /** * We only ever have a single key/value */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!is_finished) { is_finished = true; return true; } return false; } /** nextKeyValue() **/ /** @function getProgress * Rather than calculating progress, we just keep it simple */ @Override public float getProgress() throws IOException, InterruptedException { return is_finished ? 1 : 0; } /** getProgress() **/ /** * Returns the current key (name of the zipped file) */ @Override public Text getCurrentKey() throws IOException, InterruptedException { return current_key; } /** getCurrentKey() **/ /** * Returns the current value (contents of the zipped file) */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return current_value; } /** getCurrentValue() **/ /** * Close quietly, ignoring any exceptions */ @Override public void close() throws IOException { // nothing to do } /** close() **/ } /** GSFileRecordReader **/ /** @class GSFileInputFormat */ public static class GSFileInputFormat extends FileInputFormat { /** * Don't split the files */ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } /** isSplitable() **/ /** */ @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext content) throws IOException, InterruptedException { return new GSFileRecordReader(); } /** createRecordReader() **/ } /** class GSFileInputFormat **/ /** @class GSMap */ public static class GSMap extends Mapper { /** @function map * The key is the full path (HDFS) of the file to be processed. */ public void map(Text key, Text value, Context context) throws IOException, InterruptedException { String file_path = key.toString(); // - configuration for the task Configuration conf = context.getConfiguration(); String gsdlhome = conf.get("gsdlhome"); String hdfs_prefix = conf.get("hdfsprefix"); String hadoop_home = conf.get("hadoophome"); String collection = conf.get("collection"); String task_id = conf.get("mapred.task.id"); task_id = task_id.substring(8); // remove "attempt_" prefix // Programatically rewrite the protocol as appropriate for the given // archives directory (not necessary if path is local or NFS) if (hdfs_prefix.equals("/hdfs")) { file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix); } else { file_path = file_path.replace("hdfs://", hdfs_prefix); } // - create a temporary directory File greenstone_tmp_dir = new File("/tmp/greenstone"); if (!greenstone_tmp_dir.isDirectory()) { greenstone_tmp_dir.mkdir(); } // - open a unique log file File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log"); FileWriter fw1 = new FileWriter(import_process_log, true); // MEDUSA Customization: Introduce a slight delay based upon the hostname // in order to stagger the startup of Map workers. It looks like the avg // IO is around 25 minutes... so lets try to make it so the last mapper // starts up 25 minutes after the first (with all others spread in // between). String hostname = InetAddress.getLocalHost().getHostName(); // We only do this if there is a sentinel file lurking in tmp try { File delay_file = new File("/tmp/greenstone/delay.me"); if (delay_file.exists()) { Pattern p = Pattern.compile("compute-0-([0-9]+).local"); Matcher m = p.matcher(hostname); if (m.matches()) { String node_str = m.group(1); int node_number = Integer.parseInt(node_str) * 100; fw1.write("[DEBUG] Delaying start for " + node_number + " seconds"); Thread.currentThread().sleep(1000 * node_number); } // We only do this once for each compute node delay_file.delete(); } } catch (Exception ie) { System.err.println(ie.toString()); } // - start the log by writing the time and the manifest line double start_time = ((double)System.currentTimeMillis())/1000; StringBuffer header_block = new StringBuffer("[Started:"); header_block.append(String.format("%.6f", start_time)); header_block.append("]\n[Host:"); header_block.append(hostname); header_block.append("]\n[CPU:"); String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu"; File getcpu_executable = new File(getcpu_executable_cmd); String cpu_number = "0"; if (getcpu_executable.exists()) { cpu_number = runCommand(getcpu_executable_cmd); } header_block.append(cpu_number); header_block.append("]\n[Task:"); header_block.append(task_id); header_block.append("]\n[Map:"); header_block.append(file_path); header_block.append("=>"); header_block.append(value); header_block.append("]\n"); fw1.write(header_block.toString()); header_block = null; // - create a temporary manifest file to process this file. Overwrite any // existing file File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml"); FileWriter manifest_writer = new FileWriter(manifest_path); manifest_writer.write("\n"); manifest_writer.write("\t\n"); manifest_writer.write("\t\t" + file_path + "\n"); manifest_writer.write("\t\n"); manifest_writer.write("\n"); manifest_writer.close(); // - call Greenstone passing in the path to the manifest ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection); fw1.write("[Command:" + import_process_builder.command() + "]\n"); // - alter environment Map import_process_env = import_process_builder.environment(); // - path String path = import_process_env.get("PATH"); path = gsdlhome + "/ext/parallel-building/bin/script:" + path; path = gsdlhome + "/ext/parallel-building/linux/bin:" + path; path = hadoop_home + "/bin:" + path; path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path; path = gsdlhome + "/ext/tdb-edit/bin/script:" + path; path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path; path = gsdlhome + "/bin/script:" + path; path = gsdlhome + "/bin/linux:" + path; import_process_env.put("PATH", path); fw1.write("[PATH: " + path + "]\n"); // - ld_library_path import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib"); // - dyld_library_path import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib"); // - misc import_process_env.put("GSDLHOME", gsdlhome); import_process_env.put("GSDLOS", "linux"); import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio"); // - installed extension paths import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building"); import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux"); import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux"); import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux"); // - Hadoop specific import_process_env.put("HADOOP_PREFIX", hadoop_home); fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n"); // - change working directory import_process_builder.directory(new File(gsdlhome)); // - redirect STDERR to STDOUT for simplicity sake import_process_builder.redirectErrorStream(true); // Obsolete command to send output to file //import_process_builder.redirectOutput(Redirect.appendTo(import_process_log)); // - create progress reporter (so Hadoop doesn't time us out) Thread reporter = new HadoopProgressReporter(context, import_process_log); reporter.start(); // - run process Process import_process = import_process_builder.start(); BufferedReader import_process_br = new BufferedReader(new InputStreamReader(import_process.getInputStream())); String line = ""; Pattern open_tag_pattern = Pattern.compile("(.*?()?)"); Pattern close_tag_pattern = Pattern.compile("(.*?)"); while ((line = import_process_br.readLine()) != null) { // Write line to process log regardless fw1.write(line + "\n"); // Now we check for sentinel strings in the output line Text output_key; Text output_value; // Watch for open entry tags Matcher open_tag_matcher = open_tag_pattern.matcher(line); if(open_tag_matcher.matches()) { String entry_type = open_tag_matcher.group(1); String entry_key = open_tag_matcher.group(2); String entry_mode = open_tag_matcher.group(3); String entry_time = open_tag_matcher.group(4); String payload = open_tag_matcher.group(5); StringBuffer line_buffer = new StringBuffer(); // Continue until we've found the close tag - or run out of output log Matcher close_tag_matcher = close_tag_pattern.matcher(payload); while (!close_tag_matcher.matches() && (line = import_process_br.readLine()) != null) { // append any existing payload to the buffer if (line_buffer.length() > 0) { line_buffer.append("\n"); } line_buffer.append(payload); // store this line as the payload, should the match below fail payload = line; close_tag_matcher = close_tag_pattern.matcher(line); } // We've found the close tag (hopefully) so add last bit (possibly // empty string) to value if (close_tag_matcher.matches()) { String last_payload = close_tag_matcher.group(1); if (line_buffer.length() > 0) { line_buffer.append("\n"); } line_buffer.append(last_payload); last_payload = null; } close_tag_matcher = null; // Construct the compound key by which to sort the entries - note // that src is a little different than the others, as we don't care // about the timestamp at all - instead wanting to group the entries // by src file path (key) if (entry_type.equals("src")) { output_key = new Text(entry_type + " " + entry_key); } else { output_key = new Text(entry_type + " " + entry_time); } // Doc has its payload prefixed by key if (entry_type.equals("doc")) { String encoded_xml = line_buffer.toString(); String decoded_xml = encoded_xml.replace(""","\""); decoded_xml = decoded_xml.replace("'","'"); decoded_xml = decoded_xml.replace("<","<"); decoded_xml = decoded_xml.replace(">",">"); decoded_xml = decoded_xml.replace("&","&"); output_value = new Text("[" + entry_key + "]\n" + decoded_xml); } else if (entry_type.equals("src")) { String encoded_xml = line_buffer.toString(); String decoded_xml = encoded_xml.replace(""","\""); decoded_xml = decoded_xml.replace("'","'"); decoded_xml = decoded_xml.replace("<","<"); decoded_xml = decoded_xml.replace(">",">"); decoded_xml = decoded_xml.replace("&","&"); output_value = new Text(entry_key + "|" + decoded_xml); } else if (entry_type.equals("rss")) { String encoded_xml = line_buffer.toString(); String decoded_xml = encoded_xml.replace(""","\""); decoded_xml = decoded_xml.replace("'","'"); decoded_xml = decoded_xml.replace("<","<"); decoded_xml = decoded_xml.replace(">",">"); decoded_xml = decoded_xml.replace("&","&"); output_value = new Text(decoded_xml); } else { output_value = new Text(line_buffer.toString()); } entry_type = null; entry_key = null; entry_mode = null; entry_time = null; payload = null; line_buffer = null; } // all other lines we'll key by host and time else { double timestamp = ((double)System.currentTimeMillis())/1000; String argument1 = hostname + ":" + cpu_number + ":" + String.format("%.6f", timestamp); output_key = new Text("msg " + argument1); output_value = new Text(argument1 + " " + line); } // Store the result in the return context context.write(output_key, output_value); // May as well do this here - indicate to the Hadoop framework that // this process is still making progress (of some form) context.progress(); // Cleanup open_tag_matcher = null; output_key = null; output_value = null; } open_tag_pattern = null; close_tag_pattern = null; try { int import_status = import_process.waitFor(); if (import_status != 0) { throw new Exception("exit status: " + import_status); } } catch (Exception e) { System.err.println("Error! Import command failed (" + e.toString() + ")"); } // - stop the progress reporter as, one way or another, there will be no // more progress reporter.interrupt(); reporter = null; // force gc // - write end time to log double end_time = ((double)System.currentTimeMillis())/1000; fw1.write("[Completed:" + String.format("%.6f", end_time) + "]\n"); // - close our output to the log fw1.close(); } /** map(LongWritable,Text,Context) **/ } /** class GSMap **/ /** @class GSReducer */ public static class GSReducer extends Reducer { /** Prepare the Reducer by looking up configuration for this collection to * determine the appropriate tools to use to create databases. */ public void setup (Context context) { } /** setup() **/ /** */ public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { System.err.println("reduce(" + key.toString() + ", , )"); Configuration conf = context.getConfiguration(); String gsdl_home = conf.get("gsdlhome"); String collection = conf.get("collection"); String archives_dir = gsdl_home + "/collect/" + collection + "/archives"; //String archives_dir = conf.get("archivesdir"); // Eventually I'd like to read in database type from collect.cfg - or // maybe have it passed in as part of the context - but I'll hardcode // as GDBM for now as a proof of concept String infodbtype = "tdb"; String key_string = key.toString(); String[] key_parts = key_string.split(" "); if (key_parts.length >= 2) { String type = key_parts[0]; String argument = key_parts[1]; Iterator values_itr = values.iterator(); // There are basically five different cases based on the key's type // - the first is datestamp... these are sorted earliest first, and // since we only want the earliest we can ignore the rest! if (type.equals("datestamp")) { Text earliest_datestamp = values_itr.next(); // Write this directly to file try { FileWriter earliest_datestamp_fout = new FileWriter(archives_dir + "/earliestDatestamp"); earliest_datestamp_fout.write(earliest_datestamp.toString()); earliest_datestamp_fout.close(); } catch (Exception ex) { ex.printStackTrace(); } } // For 'doc' types we open a pipe to the database creator and send all // the values through for processing else if (type.equals("doc")) { GSInfoDB archiveinf_doc = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-doc." + infodbtype); while (values_itr.hasNext()) { Text db_entry = values_itr.next(); archiveinf_doc.writeEntry(db_entry.toString()); } archiveinf_doc.close(); } // Similarly for 'src' types, except here we group all entries with the // same key else if (type.equals("src")) { // Sigh - you can only create this the TDB file on a local // filesystem, so I have to create it here, and then move it into // place when finished GSInfoDB archiveinf_src = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-src." + infodbtype); String current_file_path = ""; StringBuffer current_record = new StringBuffer(); Pattern file_path_pattern = Pattern.compile("(.*?)\\|(.*)"); while (values_itr.hasNext()) { Text db_entry_raw = values_itr.next(); String db_entry = db_entry_raw.toString(); // Parse out the file path this entry refers to Matcher file_path_matcher = file_path_pattern.matcher(db_entry); if (file_path_matcher.matches()) { String this_file_path = file_path_matcher.group(1); String this_record = file_path_matcher.group(2); // Output the record (if there is one) if the file path changes if (!this_file_path.equals(current_file_path) && current_record.length() > 0) { archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString()); // store the next records details current_file_path = this_file_path; current_record = new StringBuffer(this_record); } // Append onto our growing record else { current_file_path = this_file_path; if (current_record.length() > 0) { current_record.append("\n"); } current_record.append(this_record); } } else { // Not a valid src entry? } } if (!current_file_path.equals("") && current_record.length() > 0) { archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString()); } archiveinf_src.close(); } // For 'rss' we write all the entries - in order - to an XML file else if (type.equals("rss")) { try { FileWriter rss_item_rdf = new FileWriter(archives_dir + "/rss-items.rdf"); while (values_itr.hasNext()) { Text rss_entry = values_itr.next(); rss_item_rdf.write(rss_entry.toString() + "\n"); } rss_item_rdf.close(); } catch (Exception ex) { ex.printStackTrace(); } } // Everything else we assume are just process log messages - so get // Hadoop to write in order to log (I may need to annotate these // with the host so I can see which message came from which compute // node). else { Pattern file_path_pattern = Pattern.compile("([^ ]+) (.*)"); while (values_itr.hasNext()) { String compound_value = values_itr.next().toString(); Pattern p = Pattern.compile("([^\\s]+) (.*)"); Matcher m = p.matcher(compound_value); if (m.matches()) { Text msg_key = new Text(m.group(1)); Text msg_value = new Text(m.group(2)); context.write(msg_key, msg_value); } } } } else { System.err.println("Error! Failed to parse key: " + key.toString()); } } /** reduce(key, value, context) **/ } /** class GSReducer **/ /** @function main */ public static void main(String[] args) throws Exception { if (args.length < 6) { System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest \n"); System.exit(0); } Configuration conf = new Configuration(); conf.set("gsdlhome", args[0]); conf.set("hadoophome", args[1]); conf.set("collection", args[2]); conf.set("archivesdir", args[3]); conf.set("hdfsprefix", args[4]); // "HDThriftFS", "HDFSShell", or "" conf.set("hdfsin", args[5]); conf.set("hdfsout", args[6]); // Set the number of retries to 1 - hopefully one of the following will work conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web // prevent timeouts long milli_seconds = 4*60*60*1000; // 4 hour conf.setLong("mapred.task.timeout", milli_seconds); Job job = new Job(conf, "hadoopgreenstoneingest"); job.setJarByClass(HadoopGreenstoneIngest2.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // Register the map, combiner, and reducer classes job.setMapperClass(GSMap.class); job.setPartitionerClass(GSPartitioner.class); job.setGroupingComparatorClass(GSGroupingComparator.class); job.setReducerClass(GSReducer.class); // Sets the input and output handlers - may need to adjust input to provide me // a series of filenames (TextInputFormat will instead read in a text file and // return each line...) job.setInputFormatClass(GSFileInputFormat.class); //job.setOutputFormatClass(NullOutputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // Register the input and output paths // - this input path should be to a file (in HDFS) that lists the paths to // the manifest files FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin"))); // - for now the output isn't that important, but in the future I may use // this mechanism to produce a time based log. FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout"))); // Recommended notation despite my hatiness of ?: syntax System.exit(job.waitForCompletion(true)?0:1); } /** main(String[]) **/ /** @function copyFile(File, File) * * @author Josh Froelich @ stackoverflow.com */ public static void copyFile(File sourceFile, File destFile) throws IOException { if(!destFile.exists()) { destFile.createNewFile(); } FileChannel source = null; FileChannel destination = null; try { source = new FileInputStream(sourceFile).getChannel(); destination = new FileOutputStream(destFile).getChannel(); destination.transferFrom(source, 0, source.size()); } finally { if(source != null) { source.close(); } if(destination != null) { destination.close(); } } } /** copyFile(File, File) **/ /** @function runCommand() * * A convenience method that calls an external command and returns its * standard out as a string. Warning! Not safe if the command could return a * large amount of text in the STDERR stream - may infinitely block. * */ public static String runCommand(String command) { ///ystem.err.println("[DEBUG] command: " + command); StringBuffer result = new StringBuffer(); try { Runtime run = Runtime.getRuntime() ; Process pr = run.exec(command) ; pr.waitFor() ; BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ; String line; while ( ( line = buf.readLine() ) != null ) { result.append(line); } } catch (Exception ex) { System.err.println("Error! " + ex.getMessage()); } ///ystem.err.println("[DEBUG] result: " + result.toString()); return result.toString(); } /** runCommand() **/ }