source: gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest2.java@ 28312

Last change on this file since 28312 was 28312, checked in by jmt12, 11 years ago

Working on finer control over data locality - so I can configure a run with no locality, for example

File size: 28.6 KB
Line 
1/** jmt12 **/
2package org.nzdl.gsdl;
3
4import org.nzdl.gsdl.GSGroupingComparator;
5import org.nzdl.gsdl.GSInfoDB;
6import org.nzdl.gsdl.GSPartitioner;
7
8import java.io.*;
9import java.lang.Iterable;
10import java.lang.ProcessBuilder;
11import java.lang.ProcessBuilder.*;
12import java.lang.Thread;
13import java.net.InetAddress;
14import java.nio.channels.FileChannel;
15import java.util.Collection;
16import java.util.Iterator;
17import java.util.List;
18import java.util.Map;
19import java.util.StringTokenizer;
20import java.util.regex.Matcher;
21import java.util.regex.Pattern;
22
23import org.apache.hadoop.fs.Path;
24import org.apache.hadoop.conf.*;
25import org.apache.hadoop.io.*;
26import org.apache.hadoop.mapred.ClusterStatus;
27import org.apache.hadoop.mapred.JobClient;
28import org.apache.hadoop.mapred.JobConf;
29import org.apache.hadoop.mapreduce.*;
30import org.apache.hadoop.mapreduce.Mapper.Context;
31import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
32import org.apache.hadoop.mapreduce.lib.input.FileSplit;
33import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
34import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
35import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
36import org.apache.hadoop.util.*;
37
38/** @class WordCount
39 */
40public class HadoopGreenstoneIngest2
41{
42
43 /** @class GSFileRecordReader
44 */
45 public static class GSFileRecordReader
46 extends RecordReader<Text, Text>
47 {
48 /** Uncompressed file name */
49 private Text current_key;
50
51 private Text current_value = new Text("");
52
53 /** Used to indicate progress */
54 private boolean is_finished = false;
55
56 /**
57 */
58 @Override
59 public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
60 throws IOException, InterruptedException
61 {
62 FileSplit split = (FileSplit) inputSplit;
63 current_key = new Text(split.getPath().toString());
64 }
65 /** initialize() **/
66
67 /**
68 * We only ever have a single key/value
69 */
70 @Override
71 public boolean nextKeyValue()
72 throws IOException, InterruptedException
73 {
74 if (!is_finished)
75 {
76 is_finished = true;
77 return true;
78 }
79 return false;
80 }
81 /** nextKeyValue() **/
82
83 /** @function getProgress
84 * Rather than calculating progress, we just keep it simple
85 */
86 @Override
87 public float getProgress()
88 throws IOException, InterruptedException
89 {
90 return is_finished ? 1 : 0;
91 }
92 /** getProgress() **/
93
94 /**
95 * Returns the current key (name of the zipped file)
96 */
97 @Override
98 public Text getCurrentKey()
99 throws IOException, InterruptedException
100 {
101 return current_key;
102 }
103 /** getCurrentKey() **/
104
105 /**
106 * Returns the current value (contents of the zipped file)
107 */
108 @Override
109 public Text getCurrentValue()
110 throws IOException, InterruptedException
111 {
112 return current_value;
113 }
114 /** getCurrentValue() **/
115
116 /**
117 * Close quietly, ignoring any exceptions
118 */
119 @Override
120 public void close()
121 throws IOException
122 {
123 // nothing to do
124 }
125 /** close() **/
126
127 }
128 /** GSFileRecordReader **/
129
130 /** @class GSFileInputFormat
131 */
132 public static class GSFileInputFormat
133 extends FileInputFormat<Text, Text>
134 {
135
136 private String[] getActiveServersList(JobContext context)
137 {
138 String [] servers = null;
139 try
140 {
141 JobClient jc = new JobClient((JobConf)context.getConfiguration());
142 ClusterStatus status = jc.getClusterStatus(true);
143 Collection<String> atc = status.getActiveTrackerNames();
144 servers = new String[atc.size()];
145 int s = 0;
146 for (String serverInfo : atc)
147 {
148 StringTokenizer st = new StringTokenizer(serverInfo, ":");
149 String trackerName = st.nextToken();
150 StringTokenizer st1 = new StringTokenizer(trackerName, "_");
151 st1.nextToken();
152 servers[s++] = st1.nextToken();
153 }
154 }
155 catch (IOException e)
156 {
157 e.printStackTrace();
158 }
159 System.err.print("Servers: ");
160 String sep = "";
161 for (Object obj : servers)
162 {
163 System.err.print(sep + obj.toString());
164 sep = ", ";
165 }
166 System.err.println("");
167 return servers;
168 }
169 /** getActiveServersList() **/
170
171 /**
172 */
173 public List<InputSplit> getSplits(JobContext job)
174 throws IOException
175 {
176 System.err.println("GSFileInputFormat::getSplits()");
177 // get splits
178 List<InputSplit> original_splits = super.getSplits(job);
179 // Get active servers
180 String[] servers = getActiveServersList(job);
181 if(servers == null)
182 {
183 return null;
184 }
185 // done
186 System.err.println("Splits: ");
187 for (InputSplit obj : original_splits)
188 {
189 System.err.println(obj.toString());
190 }
191 return original_splits;
192 }
193 /** getSplits() **/
194
195 /**
196 * Don't split the files
197 */
198 @Override
199 protected boolean isSplitable(JobContext context, Path filename)
200 {
201 return false;
202 }
203 /** isSplitable() **/
204
205 /**
206 */
207 @Override
208 public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext content)
209 throws IOException, InterruptedException
210 {
211 return new GSFileRecordReader();
212 }
213 /** createRecordReader() **/
214
215 }
216 /** class GSFileInputFormat **/
217
218 /** @class GSMap
219 */
220 public static class GSMap
221 extends Mapper<Text, Text, Text, Text>
222 {
223 /** @function map
224 * The key is the full path (HDFS) of the file to be processed.
225 */
226 public void map(Text key, Text value, Context context)
227 throws IOException, InterruptedException
228 {
229 String file_path = key.toString();
230 // - configuration for the task
231 Configuration conf = context.getConfiguration();
232 String gsdlhome = conf.get("gsdlhome");
233 String hdfs_prefix = conf.get("hdfsprefix");
234 String hadoop_home = conf.get("hadoophome");
235 String collection = conf.get("collection");
236 String task_id = conf.get("mapred.task.id");
237 task_id = task_id.substring(8); // remove "attempt_" prefix
238
239 // Programatically rewrite the protocol as appropriate for the given
240 // archives directory (not necessary if path is local or NFS)
241 if (hdfs_prefix.equals("/hdfs"))
242 {
243 file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix);
244 }
245 else
246 {
247 file_path = file_path.replace("hdfs://", hdfs_prefix);
248 }
249
250 // - create a temporary directory
251 File greenstone_tmp_dir = new File("/tmp/greenstone");
252 if (!greenstone_tmp_dir.isDirectory())
253 {
254 greenstone_tmp_dir.mkdir();
255 }
256
257 // - open a unique log file
258 File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
259 FileWriter fw1 = new FileWriter(import_process_log, true);
260 // MEDUSA Customization: Introduce a slight delay based upon the hostname
261 // in order to stagger the startup of Map workers. It looks like the avg
262 // IO is around 25 minutes... so lets try to make it so the last mapper
263 // starts up 25 minutes after the first (with all others spread in
264 // between).
265 String hostname = InetAddress.getLocalHost().getHostName();
266 // We only do this if there is a sentinel file lurking in tmp
267 try
268 {
269 File delay_file = new File("/tmp/greenstone/delay.me");
270 if (delay_file.exists())
271 {
272 Pattern p = Pattern.compile("compute-0-([0-9]+).local");
273 Matcher m = p.matcher(hostname);
274 if (m.matches())
275 {
276 String node_str = m.group(1);
277 int node_number = Integer.parseInt(node_str) * 100;
278 fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
279 Thread.currentThread().sleep(1000 * node_number);
280 }
281 // We only do this once for each compute node
282 delay_file.delete();
283 }
284 }
285 catch (Exception ie)
286 {
287 System.err.println(ie.toString());
288 }
289
290 // - start the log by writing the time and the manifest line
291 double start_time = ((double)System.currentTimeMillis())/1000;
292 StringBuffer header_block = new StringBuffer("[Started:");
293 header_block.append(String.format("%.6f", start_time));
294 header_block.append("]\n[Host:");
295 header_block.append(hostname);
296 header_block.append("]\n[CPU:");
297 String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
298 File getcpu_executable = new File(getcpu_executable_cmd);
299 String cpu_number = "0";
300 if (getcpu_executable.exists())
301 {
302 cpu_number = runCommand(getcpu_executable_cmd);
303 }
304 header_block.append(cpu_number);
305 header_block.append("]\n[Task:");
306 header_block.append(task_id);
307 header_block.append("]\n[Map:");
308 header_block.append(file_path);
309 header_block.append("=>");
310 header_block.append(value);
311 header_block.append("]\n");
312 fw1.write(header_block.toString());
313 header_block = null;
314
315 // - create a temporary manifest file to process this file. Overwrite any
316 // existing file
317 File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
318 FileWriter manifest_writer = new FileWriter(manifest_path);
319 manifest_writer.write("<Manifest version=\"2.0\">\n");
320 manifest_writer.write("\t<Index>\n");
321 manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
322 manifest_writer.write("\t</Index>\n");
323 manifest_writer.write("</Manifest>\n");
324 manifest_writer.close();
325
326 // - call Greenstone passing in the path to the manifest
327 ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection);
328 fw1.write("[Command:" + import_process_builder.command() + "]\n");
329 // - alter environment
330 Map<String, String> import_process_env = import_process_builder.environment();
331 // - path
332 String path = import_process_env.get("PATH");
333 path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
334 path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
335 path = hadoop_home + "/bin:" + path;
336 path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
337 path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
338 path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
339 path = gsdlhome + "/bin/script:" + path;
340 path = gsdlhome + "/bin/linux:" + path;
341 import_process_env.put("PATH", path);
342 fw1.write("[PATH: " + path + "]\n");
343 // - ld_library_path
344 import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
345 // - dyld_library_path
346 import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
347 // - misc
348 import_process_env.put("GSDLHOME", gsdlhome);
349 import_process_env.put("GSDLOS", "linux");
350 import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
351 // - installed extension paths
352 import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
353 import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
354 import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
355 import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
356 // - Hadoop specific
357 import_process_env.put("HADOOP_PREFIX", hadoop_home);
358 fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n");
359
360 // - change working directory
361 import_process_builder.directory(new File(gsdlhome));
362
363 // - redirect STDERR to STDOUT for simplicity sake
364 import_process_builder.redirectErrorStream(true);
365 // Obsolete command to send output to file
366 //import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));
367
368 // - create progress reporter (so Hadoop doesn't time us out)
369 Thread reporter = new HadoopProgressReporter(context, import_process_log);
370 reporter.start();
371
372 // - run process
373 Process import_process = import_process_builder.start();
374 BufferedReader import_process_br = new BufferedReader(new InputStreamReader(import_process.getInputStream()));
375 String line = "";
376 Pattern open_tag_pattern = Pattern.compile("<InfoDBEntry type=\"(.+?)\" key=\"(.+?)\" mode=\"(.+?)\" timestamp=\"(.+?)\">(.*?(</InfoDBEntry>)?)");
377 Pattern close_tag_pattern = Pattern.compile("(.*?)</InfoDBEntry>");
378 while ((line = import_process_br.readLine()) != null)
379 {
380 // Write line to process log regardless
381 fw1.write(line + "\n");
382 // Now we check for sentinel strings in the output line
383 Text output_key;
384 Text output_value;
385 // Watch for open entry tags
386 Matcher open_tag_matcher = open_tag_pattern.matcher(line);
387 if(open_tag_matcher.matches())
388 {
389 String entry_type = open_tag_matcher.group(1);
390 String entry_key = open_tag_matcher.group(2);
391 String entry_mode = open_tag_matcher.group(3);
392 String entry_time = open_tag_matcher.group(4);
393 String payload = open_tag_matcher.group(5);
394 StringBuffer line_buffer = new StringBuffer();
395 // Continue until we've found the close tag - or run out of output log
396 Matcher close_tag_matcher = close_tag_pattern.matcher(payload);
397 while (!close_tag_matcher.matches() && (line = import_process_br.readLine()) != null)
398 {
399 // append any existing payload to the buffer
400 if (line_buffer.length() > 0)
401 {
402 line_buffer.append("\n");
403 }
404 line_buffer.append(payload);
405 // store this line as the payload, should the match below fail
406 payload = line;
407 close_tag_matcher = close_tag_pattern.matcher(line);
408 }
409 // We've found the close tag (hopefully) so add last bit (possibly
410 // empty string) to value
411 if (close_tag_matcher.matches())
412 {
413 String last_payload = close_tag_matcher.group(1);
414 if (line_buffer.length() > 0)
415 {
416 line_buffer.append("\n");
417 }
418 line_buffer.append(last_payload);
419 last_payload = null;
420 }
421 close_tag_matcher = null;
422
423 // Construct the compound key by which to sort the entries - note
424 // that src is a little different than the others, as we don't care
425 // about the timestamp at all - instead wanting to group the entries
426 // by src file path (key)
427 if (entry_type.equals("src"))
428 {
429 output_key = new Text(entry_type + " " + entry_key);
430 }
431 else
432 {
433 output_key = new Text(entry_type + " " + entry_time);
434 }
435 // Doc has its payload prefixed by key
436 if (entry_type.equals("doc"))
437 {
438 String encoded_xml = line_buffer.toString();
439 String decoded_xml = encoded_xml.replace("&quot;","\"");
440 decoded_xml = decoded_xml.replace("&apos;","'");
441 decoded_xml = decoded_xml.replace("&lt;","<");
442 decoded_xml = decoded_xml.replace("&gt;",">");
443 decoded_xml = decoded_xml.replace("&amp;","&");
444 output_value = new Text("[" + entry_key + "]\n" + decoded_xml);
445 }
446 else if (entry_type.equals("src"))
447 {
448 String encoded_xml = line_buffer.toString();
449 String decoded_xml = encoded_xml.replace("&quot;","\"");
450 decoded_xml = decoded_xml.replace("&apos;","'");
451 decoded_xml = decoded_xml.replace("&lt;","<");
452 decoded_xml = decoded_xml.replace("&gt;",">");
453 decoded_xml = decoded_xml.replace("&amp;","&");
454 output_value = new Text(entry_key + "|" + decoded_xml);
455 }
456 else if (entry_type.equals("rss"))
457 {
458 String encoded_xml = line_buffer.toString();
459 String decoded_xml = encoded_xml.replace("&quot;","\"");
460 decoded_xml = decoded_xml.replace("&apos;","'");
461 decoded_xml = decoded_xml.replace("&lt;","<");
462 decoded_xml = decoded_xml.replace("&gt;",">");
463 decoded_xml = decoded_xml.replace("&amp;","&");
464 output_value = new Text(decoded_xml);
465 }
466 else
467 {
468 output_value = new Text(line_buffer.toString());
469 }
470 entry_type = null;
471 entry_key = null;
472 entry_mode = null;
473 entry_time = null;
474 payload = null;
475 line_buffer = null;
476 }
477 // all other lines we'll key by host and time
478 else
479 {
480 double timestamp = ((double)System.currentTimeMillis())/1000;
481 String argument1 = hostname + ":" + cpu_number + ":" + String.format("%.6f", timestamp);
482 output_key = new Text("msg " + argument1);
483 output_value = new Text(argument1 + " " + line);
484 }
485
486 // Store the result in the return context
487 context.write(output_key, output_value);
488
489 // May as well do this here - indicate to the Hadoop framework that
490 // this process is still making progress (of some form)
491 context.progress();
492
493 // Cleanup
494 open_tag_matcher = null;
495 output_key = null;
496 output_value = null;
497 }
498 open_tag_pattern = null;
499 close_tag_pattern = null;
500 try
501 {
502 int import_status = import_process.waitFor();
503 if (import_status != 0)
504 {
505 throw new Exception("exit status: " + import_status);
506 }
507 }
508 catch (Exception e)
509 {
510 System.err.println("Error! Import command failed (" + e.toString() + ")");
511 }
512
513 // - stop the progress reporter as, one way or another, there will be no
514 // more progress
515 reporter.interrupt();
516 reporter = null; // force gc
517
518 // - write end time to log
519 double end_time = ((double)System.currentTimeMillis())/1000;
520 fw1.write("[Completed:" + String.format("%.6f", end_time) + "]\n");
521 // - close our output to the log
522 fw1.close();
523 }
524 /** map(LongWritable,Text,Context) **/
525
526 }
527 /** class GSMap **/
528
529
530 /** @class GSReducer
531 */
532 public static class GSReducer
533 extends Reducer<Text, Text, Text, Text>
534 {
535
536
537 /** Prepare the Reducer by looking up configuration for this collection to
538 * determine the appropriate tools to use to create databases.
539 */
540 public void setup (Context context)
541 {
542 }
543 /** setup() **/
544
545
546 /**
547 */
548 public void reduce(Text key, Iterable<Text> values, Context context)
549 throws IOException, InterruptedException
550 {
551 System.err.println("reduce(" + key.toString() + ", <values>, <context>)");
552 Configuration conf = context.getConfiguration();
553 String gsdl_home = conf.get("gsdlhome");
554 String collection = conf.get("collection");
555 String archives_dir = gsdl_home + "/collect/" + collection + "/archives";
556 //String archives_dir = conf.get("archivesdir");
557 // Eventually I'd like to read in database type from collect.cfg - or
558 // maybe have it passed in as part of the context - but I'll hardcode
559 // as GDBM for now as a proof of concept
560 String infodbtype = "tdb";
561
562 String key_string = key.toString();
563 String[] key_parts = key_string.split(" ");
564 if (key_parts.length >= 2)
565 {
566 String type = key_parts[0];
567 String argument = key_parts[1];
568
569 Iterator<Text> values_itr = values.iterator();
570 // There are basically five different cases based on the key's type
571 // - the first is datestamp... these are sorted earliest first, and
572 // since we only want the earliest we can ignore the rest!
573 if (type.equals("datestamp"))
574 {
575 Text earliest_datestamp = values_itr.next();
576 // Write this directly to file
577 try
578 {
579 FileWriter earliest_datestamp_fout = new FileWriter(archives_dir + "/earliestDatestamp");
580 earliest_datestamp_fout.write(earliest_datestamp.toString());
581 earliest_datestamp_fout.close();
582 }
583 catch (Exception ex)
584 {
585 ex.printStackTrace();
586 }
587 }
588 // For 'doc' types we open a pipe to the database creator and send all
589 // the values through for processing
590 else if (type.equals("doc"))
591 {
592 GSInfoDB archiveinf_doc = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-doc." + infodbtype);
593 while (values_itr.hasNext())
594 {
595 Text db_entry = values_itr.next();
596 archiveinf_doc.writeEntry(db_entry.toString());
597 }
598 archiveinf_doc.close();
599 }
600 // Similarly for 'src' types, except here we group all entries with the
601 // same key
602 else if (type.equals("src"))
603 {
604 // Sigh - you can only create this the TDB file on a local
605 // filesystem, so I have to create it here, and then move it into
606 // place when finished
607 GSInfoDB archiveinf_src = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-src." + infodbtype);
608 String current_file_path = "";
609 StringBuffer current_record = new StringBuffer();
610 Pattern file_path_pattern = Pattern.compile("(.*?)\\|(.*)");
611 while (values_itr.hasNext())
612 {
613 Text db_entry_raw = values_itr.next();
614 String db_entry = db_entry_raw.toString();
615 // Parse out the file path this entry refers to
616 Matcher file_path_matcher = file_path_pattern.matcher(db_entry);
617 if (file_path_matcher.matches())
618 {
619 String this_file_path = file_path_matcher.group(1);
620 String this_record = file_path_matcher.group(2);
621 // Output the record (if there is one) if the file path changes
622 if (!this_file_path.equals(current_file_path) && current_record.length() > 0)
623 {
624 archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
625 // store the next records details
626 current_file_path = this_file_path;
627 current_record = new StringBuffer(this_record);
628 }
629 // Append onto our growing record
630 else
631 {
632 current_file_path = this_file_path;
633 if (current_record.length() > 0)
634 {
635 current_record.append("\n");
636 }
637 current_record.append(this_record);
638 }
639 }
640 else
641 {
642 // Not a valid src entry?
643 }
644 }
645 if (!current_file_path.equals("") && current_record.length() > 0)
646 {
647 archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
648 }
649 archiveinf_src.close();
650 }
651 // For 'rss' we write all the entries - in order - to an XML file
652 else if (type.equals("rss"))
653 {
654 try
655 {
656 FileWriter rss_item_rdf = new FileWriter(archives_dir + "/rss-items.rdf");
657 while (values_itr.hasNext())
658 {
659 Text rss_entry = values_itr.next();
660 rss_item_rdf.write(rss_entry.toString() + "\n");
661 }
662 rss_item_rdf.close();
663 }
664 catch (Exception ex)
665 {
666 ex.printStackTrace();
667 }
668 }
669 // Everything else we assume are just process log messages - so get
670 // Hadoop to write in order to log (I may need to annotate these
671 // with the host so I can see which message came from which compute
672 // node).
673 else
674 {
675 Pattern file_path_pattern = Pattern.compile("([^ ]+) (.*)");
676 while (values_itr.hasNext())
677 {
678 String compound_value = values_itr.next().toString();
679 Pattern p = Pattern.compile("([^\\s]+) (.*)");
680 Matcher m = p.matcher(compound_value);
681 if (m.matches())
682 {
683 Text msg_key = new Text(m.group(1));
684 Text msg_value = new Text(m.group(2));
685 context.write(msg_key, msg_value);
686 }
687 }
688 }
689 }
690 else
691 {
692 System.err.println("Error! Failed to parse key: " + key.toString());
693 }
694 }
695 /** reduce(key, value, context) **/
696
697 }
698 /** class GSReducer **/
699
700 /** @function main
701 */
702 public static void main(String[] args)
703 throws Exception
704 {
705 if (args.length < 6)
706 {
707 System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hadoop home> <collection> <archivesdir> <hdfsprefix> <hdfsin> <hdfsout>\n");
708 System.exit(0);
709 }
710
711 Configuration conf = new Configuration();
712 conf.set("gsdlhome", args[0]);
713 conf.set("hadoophome", args[1]);
714 conf.set("collection", args[2]);
715 conf.set("archivesdir", args[3]);
716 conf.set("hdfsprefix", args[4]); // "HDThriftFS", "HDFSShell", or ""
717 conf.set("hdfsin", args[5]);
718 conf.set("hdfsout", args[6]);
719
720 // Set the number of retries to 1 - hopefully one of the following will work
721 conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
722 conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
723 conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
724 // prevent timeouts
725 long milli_seconds = 4*60*60*1000; // 4 hour
726 conf.setLong("mapred.task.timeout", milli_seconds);
727
728 Job job = new Job(conf, "hadoopgreenstoneingest");
729 job.setJarByClass(HadoopGreenstoneIngest2.class);
730
731 job.setOutputKeyClass(Text.class);
732 job.setOutputValueClass(Text.class);
733
734 // Register the map, combiner, and reducer classes
735 job.setMapperClass(GSMap.class);
736 job.setPartitionerClass(GSPartitioner.class);
737 job.setGroupingComparatorClass(GSGroupingComparator.class);
738 job.setReducerClass(GSReducer.class);
739
740 // Sets the input and output handlers - may need to adjust input to provide me
741 // a series of filenames (TextInputFormat will instead read in a text file and
742 // return each line...)
743 job.setInputFormatClass(GSFileInputFormat.class);
744 //job.setOutputFormatClass(NullOutputFormat.class);
745 job.setOutputFormatClass(TextOutputFormat.class);
746
747 // Register the input and output paths
748 // - this input path should be to a file (in HDFS) that lists the paths to
749 // the manifest files
750 FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin")));
751 // - for now the output isn't that important, but in the future I may use
752 // this mechanism to produce a time based log.
753 FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout")));
754
755 // Recommended notation despite my hatiness of ?: syntax
756 System.exit(job.waitForCompletion(true)?0:1);
757 }
758 /** main(String[]) **/
759
760 /** @function copyFile(File, File)
761 *
762 * @author Josh Froelich @ stackoverflow.com
763 */
764 public static void copyFile(File sourceFile, File destFile)
765 throws IOException
766 {
767 if(!destFile.exists())
768 {
769 destFile.createNewFile();
770 }
771 FileChannel source = null;
772 FileChannel destination = null;
773 try
774 {
775 source = new FileInputStream(sourceFile).getChannel();
776 destination = new FileOutputStream(destFile).getChannel();
777 destination.transferFrom(source, 0, source.size());
778 }
779 finally
780 {
781 if(source != null)
782 {
783 source.close();
784 }
785 if(destination != null)
786 {
787 destination.close();
788 }
789 }
790 }
791 /** copyFile(File, File) **/
792
793
794 /** @function runCommand()
795 *
796 * A convenience method that calls an external command and returns its
797 * standard out as a string. Warning! Not safe if the command could return a
798 * large amount of text in the STDERR stream - may infinitely block.
799 *
800 */
801 public static String runCommand(String command)
802 {
803 ///ystem.err.println("[DEBUG] command: " + command);
804 StringBuffer result = new StringBuffer();
805 try
806 {
807 Runtime run = Runtime.getRuntime() ;
808 Process pr = run.exec(command) ;
809 pr.waitFor() ;
810 BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
811 String line;
812 while ( ( line = buf.readLine() ) != null )
813 {
814 result.append(line);
815 }
816 }
817 catch (Exception ex)
818 {
819 System.err.println("Error! " + ex.getMessage());
820 }
821 ///ystem.err.println("[DEBUG] result: " + result.toString());
822 return result.toString();
823 }
824 /** runCommand() **/
825}
Note: See TracBrowser for help on using the repository browser.