source: gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest2.java@ 30354

Last change on this file since 30354 was 30354, checked in by jmt12, 8 years ago

Extending manifest v2 support to allow for directories to be listed in manifest. Matched with changes in Directory plugin to allow paths into systems like HDFS to be listed in manifest.cd

File size: 29.5 KB
Line 
1/** jmt12 **/
2package org.nzdl.gsdl;
3
4import org.nzdl.gsdl.GSGroupingComparator;
5import org.nzdl.gsdl.GSInfoDB;
6import org.nzdl.gsdl.GSPartitioner;
7
8import java.io.*;
9import java.lang.Iterable;
10import java.lang.ProcessBuilder;
11import java.lang.ProcessBuilder.*;
12import java.lang.Thread;
13import java.net.InetAddress;
14import java.nio.channels.FileChannel;
15import java.util.Collection;
16import java.util.Iterator;
17import java.util.List;
18import java.util.Map;
19import java.util.StringTokenizer;
20import java.util.regex.Matcher;
21import java.util.regex.Pattern;
22
23import org.apache.hadoop.fs.Path;
24import org.apache.hadoop.conf.*;
25import org.apache.hadoop.io.*;
26import org.apache.hadoop.mapred.ClusterStatus;
27import org.apache.hadoop.mapred.JobClient;
28import org.apache.hadoop.mapred.JobConf;
29import org.apache.hadoop.mapreduce.*;
30import org.apache.hadoop.mapreduce.Mapper.Context;
31import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
32import org.apache.hadoop.mapreduce.lib.input.FileSplit;
33import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
34import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
35import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
36import org.apache.hadoop.util.*;
37
38/** @class WordCount
39 */
40public class HadoopGreenstoneIngest2
41{
42
43 /** @class GSFileRecordReader
44 */
45 public static class GSFileRecordReader
46 extends RecordReader<Text, Text>
47 {
48 /** Uncompressed file name */
49 private Text current_key;
50
51 private Text current_value = new Text("");
52
53 /** Used to indicate progress */
54 private boolean is_finished = false;
55
56 /**
57 */
58 @Override
59 public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
60 throws IOException, InterruptedException
61 {
62 FileSplit split = (FileSplit) inputSplit;
63 current_key = new Text(split.getPath().toString());
64 }
65 /** initialize() **/
66
67 /**
68 * We only ever have a single key/value
69 */
70 @Override
71 public boolean nextKeyValue()
72 throws IOException, InterruptedException
73 {
74 if (!is_finished)
75 {
76 is_finished = true;
77 return true;
78 }
79 return false;
80 }
81 /** nextKeyValue() **/
82
83 /** @function getProgress
84 * Rather than calculating progress, we just keep it simple
85 */
86 @Override
87 public float getProgress()
88 throws IOException, InterruptedException
89 {
90 return is_finished ? 1 : 0;
91 }
92 /** getProgress() **/
93
94 /**
95 * Returns the current key (name of the zipped file)
96 */
97 @Override
98 public Text getCurrentKey()
99 throws IOException, InterruptedException
100 {
101 return current_key;
102 }
103 /** getCurrentKey() **/
104
105 /**
106 * Returns the current value (contents of the zipped file)
107 */
108 @Override
109 public Text getCurrentValue()
110 throws IOException, InterruptedException
111 {
112 return current_value;
113 }
114 /** getCurrentValue() **/
115
116 /**
117 * Close quietly, ignoring any exceptions
118 */
119 @Override
120 public void close()
121 throws IOException
122 {
123 // nothing to do
124 }
125 /** close() **/
126
127 }
128 /** GSFileRecordReader **/
129
130 /** @class GSFileInputFormat
131 */
132 public static class GSFileInputFormat
133 extends FileInputFormat<Text, Text>
134 {
135
136 private String[] getActiveServersList(JobContext context)
137 {
138 String [] servers = null;
139 try
140 {
141 JobClient jc = new JobClient((JobConf)context.getConfiguration());
142 ClusterStatus status = jc.getClusterStatus(true);
143 Collection<String> atc = status.getActiveTrackerNames();
144 servers = new String[atc.size()];
145 int s = 0;
146 for (String serverInfo : atc)
147 {
148 StringTokenizer st = new StringTokenizer(serverInfo, ":");
149 String trackerName = st.nextToken();
150 StringTokenizer st1 = new StringTokenizer(trackerName, "_");
151 st1.nextToken();
152 servers[s++] = st1.nextToken();
153 }
154 }
155 catch (IOException e)
156 {
157 e.printStackTrace();
158 }
159 System.err.print("Servers: ");
160 String sep = "";
161 for (Object obj : servers)
162 {
163 System.err.print(sep + obj.toString());
164 sep = ", ";
165 }
166 System.err.println("");
167 return servers;
168 }
169 /** getActiveServersList() **/
170
171 /**
172 */
173 public List<InputSplit> getSplits(JobContext job)
174 throws IOException
175 {
176 System.err.println("GSFileInputFormat::getSplits()");
177 // get splits
178 List<InputSplit> original_splits = super.getSplits(job);
179 // Get active servers
180 String[] servers = getActiveServersList(job);
181 if(servers == null)
182 {
183 return null;
184 }
185 // done
186 System.err.println("Splits: ");
187 for (InputSplit obj : original_splits)
188 {
189 System.err.println(obj.toString());
190 }
191 return original_splits;
192 }
193 /** getSplits() **/
194
195 /**
196 * Don't split the files
197 */
198 @Override
199 protected boolean isSplitable(JobContext context, Path filename)
200 {
201 return false;
202 }
203 /** isSplitable() **/
204
205 /**
206 */
207 @Override
208 public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext content)
209 throws IOException, InterruptedException
210 {
211 return new GSFileRecordReader();
212 }
213 /** createRecordReader() **/
214
215 }
216 /** class GSFileInputFormat **/
217
218 /** @class GSMap
219 */
220 public static class GSMap
221 extends Mapper<Text, Text, Text, Text>
222 {
223 /** @function map
224 * The key is the full path (HDFS) of the file to be processed.
225 */
226 public void map(Text key, Text value, Context context)
227 throws IOException, InterruptedException
228 {
229 String file_path = key.toString();
230 // - configuration for the task
231 Configuration conf = context.getConfiguration();
232 String gsdlhome = conf.get("gsdlhome");
233 String hdfs_prefix = conf.get("hdfsprefix");
234 String hadoop_home = conf.get("hadoophome");
235 String collection = conf.get("collection");
236 String task_id = conf.get("mapred.task.id");
237 task_id = task_id.substring(8); // remove "attempt_" prefix
238
239 // Programatically rewrite the protocol as appropriate for the given
240 // archives directory (not necessary if path is local or NFS)
241 if (hdfs_prefix.equals("/hdfs"))
242 {
243 file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix);
244 }
245 else
246 {
247 file_path = file_path.replace("hdfs://", hdfs_prefix);
248 }
249
250 // - create a temporary directory
251 File greenstone_tmp_dir = new File("/tmp/greenstone");
252 if (!greenstone_tmp_dir.isDirectory())
253 {
254 greenstone_tmp_dir.mkdir();
255 }
256
257 // - open a unique log file
258 File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
259 FileWriter fw1 = new FileWriter(import_process_log, true);
260 // MEDUSA Customization: Introduce a slight delay based upon the hostname
261 // in order to stagger the startup of Map workers. It looks like the avg
262 // IO is around 25 minutes... so lets try to make it so the last mapper
263 // starts up 25 minutes after the first (with all others spread in
264 // between).
265 String hostname = InetAddress.getLocalHost().getHostName();
266 // We only do this if there is a sentinel file lurking in tmp
267 try
268 {
269 File delay_file = new File("/tmp/greenstone/delay.me");
270 if (delay_file.exists())
271 {
272 Pattern p = Pattern.compile("compute-0-([0-9]+).local");
273 Matcher m = p.matcher(hostname);
274 if (m.matches())
275 {
276 String node_str = m.group(1);
277 int node_number = Integer.parseInt(node_str) * 100;
278 fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
279 Thread.currentThread().sleep(1000 * node_number);
280 }
281 // We only do this once for each compute node
282 delay_file.delete();
283 }
284 }
285 catch (Exception ie)
286 {
287 System.err.println(ie.toString());
288 }
289
290 // - start the log by writing the time and the manifest line
291 double start_time = ((double)System.currentTimeMillis())/1000;
292 StringBuffer header_block = new StringBuffer("[Started:");
293 header_block.append(String.format("%.6f", start_time));
294 header_block.append("]\n[Host:");
295 header_block.append(hostname);
296 header_block.append("]\n[CPU:");
297 String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
298 File getcpu_executable = new File(getcpu_executable_cmd);
299 String cpu_number = "0";
300 if (getcpu_executable.exists())
301 {
302 cpu_number = runCommand(getcpu_executable_cmd);
303 }
304 header_block.append(cpu_number);
305 header_block.append("]\n[Task:");
306 header_block.append(task_id);
307 header_block.append("]\n[Map:");
308 header_block.append(file_path);
309 header_block.append("=>");
310 header_block.append(value);
311 header_block.append("]\n");
312 fw1.write(header_block.toString());
313 header_block = null;
314
315 // - create a temporary manifest file to process this file. Overwrite any
316 // existing file
317 File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
318 FileWriter manifest_writer = new FileWriter(manifest_path);
319 manifest_writer.write("<Manifest version=\"2.0\">\n");
320 manifest_writer.write("\t<Index>\n");
321 manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
322 manifest_writer.write("\t</Index>\n");
323 manifest_writer.write("</Manifest>\n");
324 manifest_writer.close();
325
326 /* Original process calling - sets up environment in Java
327 // - call Greenstone passing in the path to the manifest
328 ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection);
329 fw1.write("[Command:" + import_process_builder.command() + "]\n");
330 // - alter environment
331 Map<String, String> import_process_env = import_process_builder.environment();
332 // - path
333 String path = import_process_env.get("PATH");
334 path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
335 path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
336 path = hadoop_home + "/bin:" + path;
337 path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
338 path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
339 path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
340 path = gsdlhome + "/bin/script:" + path;
341 path = gsdlhome + "/bin/linux:" + path;
342 import_process_env.put("PATH", path);
343 fw1.write("[PATH: " + path + "]\n");
344 // - ld_library_path
345 import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
346 // - dyld_library_path
347 import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
348 // - misc
349 import_process_env.put("GSDLHOME", gsdlhome);
350 import_process_env.put("GSDLOS", "linux");
351 import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
352 // - installed extension paths
353 import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
354 import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
355 import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
356 import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
357 // - Hadoop specific
358 import_process_env.put("HADOOP_PREFIX", hadoop_home);
359 fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n");
360 */
361
362 /* New process call - adds call to setup.bash first to prepare
363 * environment... hopefully */
364 // - call Greenstone passing in the path to the manifest
365 String environment_script_filename = "setup.bash";
366 StringBuffer cmd_buffer = new StringBuffer();
367 cmd_buffer.append("source ./");
368 cmd_buffer.append(environment_script_filename);
369 cmd_buffer.append(" && time -p import.pl -keepold -manifest \"");
370 cmd_buffer.append(manifest_path.toString());
371 cmd_buffer.append("\" -archivedir \"");
372 cmd_buffer.append(conf.get("archivesdir"));
373 cmd_buffer.append("\" ");
374 cmd_buffer.append(collection);
375 ProcessBuilder import_process_builder = new ProcessBuilder("bash", "-c", cmd_buffer.toString());
376 fw1.write("[Command:" + import_process_builder.command() + "]\n");
377
378 // - change working directory
379 import_process_builder.directory(new File(gsdlhome));
380
381 // - redirect STDERR to STDOUT for simplicity sake
382 import_process_builder.redirectErrorStream(true);
383 // Obsolete command to send output to file
384 //import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));
385
386 // - create progress reporter (so Hadoop doesn't time us out)
387 Thread reporter = new HadoopProgressReporter(context, import_process_log);
388 reporter.start();
389
390 // - run process
391 Process import_process = import_process_builder.start();
392 BufferedReader import_process_br = new BufferedReader(new InputStreamReader(import_process.getInputStream()));
393 String line = "";
394 Pattern open_tag_pattern = Pattern.compile("<InfoDBEntry type=\"(.+?)\" key=\"(.+?)\" mode=\"(.+?)\" timestamp=\"(.+?)\">(.*?(</InfoDBEntry>)?)");
395 Pattern close_tag_pattern = Pattern.compile("(.*?)</InfoDBEntry>");
396 while ((line = import_process_br.readLine()) != null)
397 {
398 // Write line to process log regardless
399 fw1.write(line + "\n");
400 // Now we check for sentinel strings in the output line
401 Text output_key;
402 Text output_value;
403 // Watch for open entry tags
404 Matcher open_tag_matcher = open_tag_pattern.matcher(line);
405 if(open_tag_matcher.matches())
406 {
407 String entry_type = open_tag_matcher.group(1);
408 String entry_key = open_tag_matcher.group(2);
409 String entry_mode = open_tag_matcher.group(3);
410 String entry_time = open_tag_matcher.group(4);
411 String payload = open_tag_matcher.group(5);
412 StringBuffer line_buffer = new StringBuffer();
413 // Continue until we've found the close tag - or run out of output log
414 Matcher close_tag_matcher = close_tag_pattern.matcher(payload);
415 while (!close_tag_matcher.matches() && (line = import_process_br.readLine()) != null)
416 {
417 // append any existing payload to the buffer
418 if (line_buffer.length() > 0)
419 {
420 line_buffer.append("\n");
421 }
422 line_buffer.append(payload);
423 // store this line as the payload, should the match below fail
424 payload = line;
425 close_tag_matcher = close_tag_pattern.matcher(line);
426 }
427 // We've found the close tag (hopefully) so add last bit (possibly
428 // empty string) to value
429 if (close_tag_matcher.matches())
430 {
431 String last_payload = close_tag_matcher.group(1);
432 if (line_buffer.length() > 0)
433 {
434 line_buffer.append("\n");
435 }
436 line_buffer.append(last_payload);
437 last_payload = null;
438 }
439 close_tag_matcher = null;
440
441 // Construct the compound key by which to sort the entries - note
442 // that src is a little different than the others, as we don't care
443 // about the timestamp at all - instead wanting to group the entries
444 // by src file path (key)
445 if (entry_type.equals("src"))
446 {
447 output_key = new Text(entry_type + " " + entry_key);
448 }
449 else
450 {
451 output_key = new Text(entry_type + " " + entry_time);
452 }
453 // Doc has its payload prefixed by key
454 if (entry_type.equals("doc"))
455 {
456 String encoded_xml = line_buffer.toString();
457 String decoded_xml = encoded_xml.replace("&quot;","\"");
458 decoded_xml = decoded_xml.replace("&apos;","'");
459 decoded_xml = decoded_xml.replace("&lt;","<");
460 decoded_xml = decoded_xml.replace("&gt;",">");
461 decoded_xml = decoded_xml.replace("&amp;","&");
462 output_value = new Text("[" + entry_key + "]\n" + decoded_xml);
463 }
464 else if (entry_type.equals("src"))
465 {
466 String encoded_xml = line_buffer.toString();
467 String decoded_xml = encoded_xml.replace("&quot;","\"");
468 decoded_xml = decoded_xml.replace("&apos;","'");
469 decoded_xml = decoded_xml.replace("&lt;","<");
470 decoded_xml = decoded_xml.replace("&gt;",">");
471 decoded_xml = decoded_xml.replace("&amp;","&");
472 output_value = new Text(entry_key + "|" + decoded_xml);
473 }
474 else if (entry_type.equals("rss"))
475 {
476 String encoded_xml = line_buffer.toString();
477 String decoded_xml = encoded_xml.replace("&quot;","\"");
478 decoded_xml = decoded_xml.replace("&apos;","'");
479 decoded_xml = decoded_xml.replace("&lt;","<");
480 decoded_xml = decoded_xml.replace("&gt;",">");
481 decoded_xml = decoded_xml.replace("&amp;","&");
482 output_value = new Text(decoded_xml);
483 }
484 else
485 {
486 output_value = new Text(line_buffer.toString());
487 }
488 entry_type = null;
489 entry_key = null;
490 entry_mode = null;
491 entry_time = null;
492 payload = null;
493 line_buffer = null;
494 }
495 // all other lines we'll key by host and time
496 else
497 {
498 double timestamp = ((double)System.currentTimeMillis())/1000;
499 String argument1 = hostname + ":" + cpu_number + ":" + String.format("%.6f", timestamp);
500 output_key = new Text("msg " + argument1);
501 output_value = new Text(argument1 + " " + line);
502 }
503
504 // Store the result in the return context
505 context.write(output_key, output_value);
506
507 // May as well do this here - indicate to the Hadoop framework that
508 // this process is still making progress (of some form)
509 context.progress();
510
511 // Cleanup
512 open_tag_matcher = null;
513 output_key = null;
514 output_value = null;
515 }
516 open_tag_pattern = null;
517 close_tag_pattern = null;
518 try
519 {
520 int import_status = import_process.waitFor();
521 if (import_status != 0)
522 {
523 throw new Exception("exit status: " + import_status);
524 }
525 }
526 catch (Exception e)
527 {
528 System.err.println("Error! Import command failed (" + e.toString() + ")");
529 }
530
531 // - stop the progress reporter as, one way or another, there will be no
532 // more progress
533 reporter.interrupt();
534 reporter = null; // force gc
535
536 // - write end time to log
537 double end_time = ((double)System.currentTimeMillis())/1000;
538 fw1.write("[Completed:" + String.format("%.6f", end_time) + "]\n");
539 // - close our output to the log
540 fw1.close();
541 }
542 /** map(LongWritable,Text,Context) **/
543
544 }
545 /** class GSMap **/
546
547
548 /** @class GSReducer
549 */
550 public static class GSReducer
551 extends Reducer<Text, Text, Text, Text>
552 {
553
554
555 /** Prepare the Reducer by looking up configuration for this collection to
556 * determine the appropriate tools to use to create databases.
557 */
558 public void setup (Context context)
559 {
560 }
561 /** setup() **/
562
563
564 /**
565 */
566 public void reduce(Text key, Iterable<Text> values, Context context)
567 throws IOException, InterruptedException
568 {
569 System.err.println("reduce(" + key.toString() + ", <values>, <context>)");
570 Configuration conf = context.getConfiguration();
571 String gsdl_home = conf.get("gsdlhome");
572 String collection = conf.get("collection");
573 String archives_dir = gsdl_home + "/collect/" + collection + "/archives";
574 //String archives_dir = conf.get("archivesdir");
575 // Eventually I'd like to read in database type from collect.cfg - or
576 // maybe have it passed in as part of the context - but I'll hardcode
577 // as GDBM for now as a proof of concept
578 String infodbtype = "tdb";
579
580 String key_string = key.toString();
581 String[] key_parts = key_string.split(" ");
582 if (key_parts.length >= 2)
583 {
584 String type = key_parts[0];
585 String argument = key_parts[1];
586
587 Iterator<Text> values_itr = values.iterator();
588 // There are basically five different cases based on the key's type
589 // - the first is datestamp... these are sorted earliest first, and
590 // since we only want the earliest we can ignore the rest!
591 if (type.equals("datestamp"))
592 {
593 Text earliest_datestamp = values_itr.next();
594 // Write this directly to file
595 try
596 {
597 FileWriter earliest_datestamp_fout = new FileWriter(archives_dir + "/earliestDatestamp");
598 earliest_datestamp_fout.write(earliest_datestamp.toString());
599 earliest_datestamp_fout.close();
600 }
601 catch (Exception ex)
602 {
603 ex.printStackTrace();
604 }
605 }
606 // For 'doc' types we open a pipe to the database creator and send all
607 // the values through for processing
608 else if (type.equals("doc"))
609 {
610 GSInfoDB archiveinf_doc = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-doc." + infodbtype);
611 while (values_itr.hasNext())
612 {
613 Text db_entry = values_itr.next();
614 archiveinf_doc.writeEntry(db_entry.toString());
615 }
616 archiveinf_doc.close();
617 }
618 // Similarly for 'src' types, except here we group all entries with the
619 // same key
620 else if (type.equals("src"))
621 {
622 // Sigh - you can only create this the TDB file on a local
623 // filesystem, so I have to create it here, and then move it into
624 // place when finished
625 GSInfoDB archiveinf_src = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-src." + infodbtype);
626 String current_file_path = "";
627 StringBuffer current_record = new StringBuffer();
628 Pattern file_path_pattern = Pattern.compile("(.*?)\\|(.*)");
629 while (values_itr.hasNext())
630 {
631 Text db_entry_raw = values_itr.next();
632 String db_entry = db_entry_raw.toString();
633 // Parse out the file path this entry refers to
634 Matcher file_path_matcher = file_path_pattern.matcher(db_entry);
635 if (file_path_matcher.matches())
636 {
637 String this_file_path = file_path_matcher.group(1);
638 String this_record = file_path_matcher.group(2);
639 // Output the record (if there is one) if the file path changes
640 if (!this_file_path.equals(current_file_path) && current_record.length() > 0)
641 {
642 archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
643 // store the next records details
644 current_file_path = this_file_path;
645 current_record = new StringBuffer(this_record);
646 }
647 // Append onto our growing record
648 else
649 {
650 current_file_path = this_file_path;
651 if (current_record.length() > 0)
652 {
653 current_record.append("\n");
654 }
655 current_record.append(this_record);
656 }
657 }
658 else
659 {
660 // Not a valid src entry?
661 }
662 }
663 if (!current_file_path.equals("") && current_record.length() > 0)
664 {
665 archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
666 }
667 archiveinf_src.close();
668 }
669 // For 'rss' we write all the entries - in order - to an XML file
670 else if (type.equals("rss"))
671 {
672 try
673 {
674 FileWriter rss_item_rdf = new FileWriter(archives_dir + "/rss-items.rdf");
675 while (values_itr.hasNext())
676 {
677 Text rss_entry = values_itr.next();
678 rss_item_rdf.write(rss_entry.toString() + "\n");
679 }
680 rss_item_rdf.close();
681 }
682 catch (Exception ex)
683 {
684 ex.printStackTrace();
685 }
686 }
687 // Everything else we assume are just process log messages - so get
688 // Hadoop to write in order to log (I may need to annotate these
689 // with the host so I can see which message came from which compute
690 // node).
691 else
692 {
693 Pattern file_path_pattern = Pattern.compile("([^ ]+) (.*)");
694 while (values_itr.hasNext())
695 {
696 String compound_value = values_itr.next().toString();
697 Pattern p = Pattern.compile("([^\\s]+) (.*)");
698 Matcher m = p.matcher(compound_value);
699 if (m.matches())
700 {
701 Text msg_key = new Text(m.group(1));
702 Text msg_value = new Text(m.group(2));
703 context.write(msg_key, msg_value);
704 }
705 }
706 }
707 }
708 else
709 {
710 System.err.println("Error! Failed to parse key: " + key.toString());
711 }
712 }
713 /** reduce(key, value, context) **/
714
715 }
716 /** class GSReducer **/
717
718 /** @function main
719 */
720 public static void main(String[] args)
721 throws Exception
722 {
723 if (args.length < 6)
724 {
725 System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hadoop home> <collection> <archivesdir> <hdfsprefix> <hdfsin> <hdfsout>\n");
726 System.exit(0);
727 }
728
729 Configuration conf = new Configuration();
730 conf.set("gsdlhome", args[0]);
731 conf.set("hadoophome", args[1]);
732 conf.set("collection", args[2]);
733 conf.set("archivesdir", args[3]);
734 conf.set("hdfsprefix", args[4]); // "HDThriftFS", "HDFSShell", or ""
735 conf.set("hdfsin", args[5]);
736 conf.set("hdfsout", args[6]);
737
738 // Set the number of retries to 1 - hopefully one of the following will work
739 conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
740 conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
741 conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
742 // prevent timeouts
743 long milli_seconds = 4*60*60*1000; // 4 hour
744 conf.setLong("mapred.task.timeout", milli_seconds);
745
746 Job job = new Job(conf, "hadoopgreenstoneingest");
747 job.setJarByClass(HadoopGreenstoneIngest2.class);
748
749 job.setOutputKeyClass(Text.class);
750 job.setOutputValueClass(Text.class);
751
752 // Register the map, combiner, and reducer classes
753 job.setMapperClass(GSMap.class);
754 job.setPartitionerClass(GSPartitioner.class);
755 job.setGroupingComparatorClass(GSGroupingComparator.class);
756 job.setReducerClass(GSReducer.class);
757
758 // Sets the input and output handlers - may need to adjust input to provide
759 // a series of filenames (TextInputFormat will instead read in a text file
760 // and return each line...)
761 job.setInputFormatClass(GSFileInputFormat.class);
762 //job.setOutputFormatClass(NullOutputFormat.class);
763 job.setOutputFormatClass(TextOutputFormat.class);
764
765 // Register the input and output paths
766 // - this input path should be to a file (in HDFS) that lists the paths to
767 // the manifest files
768 FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin")));
769 // - for now the output isn't that important, but in the future I may use
770 // this mechanism to produce a time based log.
771 FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout")));
772
773 // Recommended notation despite my hatiness of ?: syntax
774 System.exit(job.waitForCompletion(true)?0:1);
775 }
776 /** main(String[]) **/
777
778 /** @function copyFile(File, File)
779 *
780 * @author Josh Froelich @ stackoverflow.com
781 */
782 public static void copyFile(File sourceFile, File destFile)
783 throws IOException
784 {
785 if(!destFile.exists())
786 {
787 destFile.createNewFile();
788 }
789 FileChannel source = null;
790 FileChannel destination = null;
791 try
792 {
793 source = new FileInputStream(sourceFile).getChannel();
794 destination = new FileOutputStream(destFile).getChannel();
795 destination.transferFrom(source, 0, source.size());
796 }
797 finally
798 {
799 if(source != null)
800 {
801 source.close();
802 }
803 if(destination != null)
804 {
805 destination.close();
806 }
807 }
808 }
809 /** copyFile(File, File) **/
810
811
812 /** @function runCommand()
813 *
814 * A convenience method that calls an external command and returns its
815 * standard out as a string. Warning! Not safe if the command could return a
816 * large amount of text in the STDERR stream - may infinitely block.
817 *
818 */
819 public static String runCommand(String command)
820 {
821 ///ystem.err.println("[DEBUG] command: " + command);
822 StringBuffer result = new StringBuffer();
823 try
824 {
825 Runtime run = Runtime.getRuntime() ;
826 Process pr = run.exec(command) ;
827 pr.waitFor() ;
828 BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
829 String line;
830 while ( ( line = buf.readLine() ) != null )
831 {
832 result.append(line);
833 }
834 }
835 catch (Exception ex)
836 {
837 System.err.println("Error! " + ex.getMessage());
838 }
839 ///ystem.err.println("[DEBUG] result: " + result.toString());
840 return result.toString();
841 }
842 /** runCommand() **/
843}
Note: See TracBrowser for help on using the repository browser.