source: gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest2.java@ 28012

Last change on this file since 28012 was 28012, checked in by jmt12, 11 years ago

Express start time as a double as well

File size: 26.8 KB
Line 
1/** jmt12 **/
2package org.nzdl.gsdl;
3
4import org.nzdl.gsdl.GSGroupingComparator;
5import org.nzdl.gsdl.GSInfoDB;
6import org.nzdl.gsdl.GSPartitioner;
7
8import java.io.*;
9import java.lang.Iterable;
10import java.lang.ProcessBuilder;
11import java.lang.ProcessBuilder.*;
12import java.lang.Thread;
13import java.net.InetAddress;
14import java.nio.channels.FileChannel;
15import java.util.Iterator;
16import java.util.Map;
17import java.util.regex.Matcher;
18import java.util.regex.Pattern;
19
20import org.apache.hadoop.fs.Path;
21import org.apache.hadoop.conf.*;
22import org.apache.hadoop.io.*;
23import org.apache.hadoop.mapreduce.*;
24import org.apache.hadoop.mapreduce.Mapper.Context;
25import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
26import org.apache.hadoop.mapreduce.lib.input.FileSplit;
27import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
28import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
29import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
30import org.apache.hadoop.util.*;
31
32/** @class WordCount
33 */
34public class HadoopGreenstoneIngest2
35{
36
37 /** @class GSFileRecordReader
38 */
39 public static class GSFileRecordReader
40 extends RecordReader<Text, Text>
41 {
42 /** Uncompressed file name */
43 private Text current_key;
44
45 private Text current_value = new Text("");
46
47 /** Used to indicate progress */
48 private boolean is_finished = false;
49
50 /**
51 */
52 @Override
53 public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
54 throws IOException, InterruptedException
55 {
56 FileSplit split = (FileSplit) inputSplit;
57 current_key = new Text(split.getPath().toString());
58 }
59 /** initialize() **/
60
61 /**
62 * We only ever have a single key/value
63 */
64 @Override
65 public boolean nextKeyValue()
66 throws IOException, InterruptedException
67 {
68 if (!is_finished)
69 {
70 is_finished = true;
71 return true;
72 }
73 return false;
74 }
75 /** nextKeyValue() **/
76
77 /** @function getProgress
78 * Rather than calculating progress, we just keep it simple
79 */
80 @Override
81 public float getProgress()
82 throws IOException, InterruptedException
83 {
84 return is_finished ? 1 : 0;
85 }
86 /** getProgress() **/
87
88 /**
89 * Returns the current key (name of the zipped file)
90 */
91 @Override
92 public Text getCurrentKey()
93 throws IOException, InterruptedException
94 {
95 return current_key;
96 }
97 /** getCurrentKey() **/
98
99 /**
100 * Returns the current value (contents of the zipped file)
101 */
102 @Override
103 public Text getCurrentValue()
104 throws IOException, InterruptedException
105 {
106 return current_value;
107 }
108 /** getCurrentValue() **/
109
110 /**
111 * Close quietly, ignoring any exceptions
112 */
113 @Override
114 public void close()
115 throws IOException
116 {
117 // nothing to do
118 }
119 /** close() **/
120
121 }
122 /** GSFileRecordReader **/
123
124 /** @class GSFileInputFormat
125 */
126 public static class GSFileInputFormat
127 extends FileInputFormat<Text, Text>
128 {
129 /**
130 * Don't split the files
131 */
132 @Override
133 protected boolean isSplitable(JobContext context, Path filename)
134 {
135 return false;
136 }
137 /** isSplitable() **/
138
139 /**
140 */
141 @Override
142 public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext content)
143 throws IOException, InterruptedException
144 {
145 return new GSFileRecordReader();
146 }
147 /** createRecordReader() **/
148
149 }
150 /** class GSFileInputFormat **/
151
152 /** @class GSMap
153 */
154 public static class GSMap
155 extends Mapper<Text, Text, Text, Text>
156 {
157 /** @function map
158 * The key is the full path (HDFS) of the file to be processed.
159 */
160 public void map(Text key, Text value, Context context)
161 throws IOException, InterruptedException
162 {
163 String file_path = key.toString();
164 // - configuration for the task
165 Configuration conf = context.getConfiguration();
166 String gsdlhome = conf.get("gsdlhome");
167 String hdfs_prefix = conf.get("hdfsprefix");
168 String hadoop_home = conf.get("hadoophome");
169 String collection = conf.get("collection");
170 String task_id = conf.get("mapred.task.id");
171 task_id = task_id.substring(8); // remove "attempt_" prefix
172
173 // Programatically rewrite the protocol as appropriate for the given
174 // archives directory (not necessary if path is local or NFS)
175 if (hdfs_prefix.equals("/hdfs"))
176 {
177 file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix);
178 }
179 else
180 {
181 file_path = file_path.replace("hdfs://", hdfs_prefix);
182 }
183
184 // - create a temporary directory
185 File greenstone_tmp_dir = new File("/tmp/greenstone");
186 if (!greenstone_tmp_dir.isDirectory())
187 {
188 greenstone_tmp_dir.mkdir();
189 }
190
191 // - open a unique log file
192 File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
193 FileWriter fw1 = new FileWriter(import_process_log, true);
194 // MEDUSA Customization: Introduce a slight delay based upon the hostname
195 // in order to stagger the startup of Map workers. It looks like the avg
196 // IO is around 25 minutes... so lets try to make it so the last mapper
197 // starts up 25 minutes after the first (with all others spread in
198 // between).
199 String hostname = InetAddress.getLocalHost().getHostName();
200 // We only do this if there is a sentinel file lurking in tmp
201 try
202 {
203 File delay_file = new File("/tmp/greenstone/delay.me");
204 if (delay_file.exists())
205 {
206 Pattern p = Pattern.compile("compute-0-([0-9]+).local");
207 Matcher m = p.matcher(hostname);
208 if (m.matches())
209 {
210 String node_str = m.group(1);
211 int node_number = Integer.parseInt(node_str) * 100;
212 fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
213 Thread.currentThread().sleep(1000 * node_number);
214 }
215 // We only do this once for each compute node
216 delay_file.delete();
217 }
218 }
219 catch (Exception ie)
220 {
221 System.err.println(ie.toString());
222 }
223
224 // - start the log by writing the time and the manifest line
225 double start_time = ((double)System.currentTimeMillis())/1000;
226 StringBuffer header_block = new StringBuffer("[Started:");
227 header_block.append(String.format("%.6f", start_time));
228 header_block.append("]\n[Host:");
229 header_block.append(hostname);
230 header_block.append("]\n[CPU:");
231 String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
232 File getcpu_executable = new File(getcpu_executable_cmd);
233 String cpu_number = "0";
234 if (getcpu_executable.exists())
235 {
236 cpu_number = runCommand(getcpu_executable_cmd);
237 }
238 header_block.append(cpu_number);
239 header_block.append("]\n[Task:");
240 header_block.append(task_id);
241 header_block.append("]\n[Map:");
242 header_block.append(file_path);
243 header_block.append("=>");
244 header_block.append(value);
245 header_block.append("]\n");
246 fw1.write(header_block.toString());
247 header_block = null;
248
249 // - create a temporary manifest file to process this file. Overwrite any
250 // existing file
251 File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
252 FileWriter manifest_writer = new FileWriter(manifest_path);
253 manifest_writer.write("<Manifest version=\"2.0\">\n");
254 manifest_writer.write("\t<Index>\n");
255 manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
256 manifest_writer.write("\t</Index>\n");
257 manifest_writer.write("</Manifest>\n");
258 manifest_writer.close();
259
260 // - call Greenstone passing in the path to the manifest
261 ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection);
262 fw1.write("[Command:" + import_process_builder.command() + "]\n");
263 // - alter environment
264 Map<String, String> import_process_env = import_process_builder.environment();
265 // - path
266 String path = import_process_env.get("PATH");
267 path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
268 path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
269 path = hadoop_home + "/bin:" + path;
270 path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
271 path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
272 path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
273 path = gsdlhome + "/bin/script:" + path;
274 path = gsdlhome + "/bin/linux:" + path;
275 import_process_env.put("PATH", path);
276 fw1.write("[PATH: " + path + "]\n");
277 // - ld_library_path
278 import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
279 // - dyld_library_path
280 import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
281 // - misc
282 import_process_env.put("GSDLHOME", gsdlhome);
283 import_process_env.put("GSDLOS", "linux");
284 import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
285 // - installed extension paths
286 import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
287 import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
288 import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
289 import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
290 // - Hadoop specific
291 import_process_env.put("HADOOP_PREFIX", hadoop_home);
292 fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n");
293
294 // - change working directory
295 import_process_builder.directory(new File(gsdlhome));
296 // - close our output to the log before opening in the process
297 fw1.close();
298
299 // - redirect STDERR to STDOUT for simplicity sake
300 import_process_builder.redirectErrorStream(true);
301 // Obsolete command to send output to file
302 //import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));
303
304 // - create progress reporter (so Hadoop doesn't time us out)
305 Thread reporter = new HadoopProgressReporter(context, import_process_log);
306 reporter.start();
307
308 // - run process
309 Process import_process = import_process_builder.start();
310 BufferedReader import_process_br = new BufferedReader(new InputStreamReader(import_process.getInputStream()));
311 String line = "";
312 Pattern open_tag_pattern = Pattern.compile("<InfoDBEntry type=\"(.+?)\" key=\"(.+?)\" mode=\"(.+?)\" timestamp=\"(.+?)\">(.*?(</InfoDBEntry>)?)");
313 Pattern close_tag_pattern = Pattern.compile("(.*?)</InfoDBEntry>");
314 while ((line = import_process_br.readLine()) != null)
315 {
316 Text output_key;
317 Text output_value;
318 // Watch for open entry tags
319 Matcher open_tag_matcher = open_tag_pattern.matcher(line);
320 if(open_tag_matcher.matches())
321 {
322 String entry_type = open_tag_matcher.group(1);
323 String entry_key = open_tag_matcher.group(2);
324 String entry_mode = open_tag_matcher.group(3);
325 String entry_time = open_tag_matcher.group(4);
326 String payload = open_tag_matcher.group(5);
327 StringBuffer line_buffer = new StringBuffer();
328 // Continue until we've found the close tag - or run out of output log
329 Matcher close_tag_matcher = close_tag_pattern.matcher(payload);
330 while (!close_tag_matcher.matches() && (line = import_process_br.readLine()) != null)
331 {
332 // append any existing payload to the buffer
333 if (line_buffer.length() > 0)
334 {
335 line_buffer.append("\n");
336 }
337 line_buffer.append(payload);
338 // store this line as the payload, should the match below fail
339 payload = line;
340 close_tag_matcher = close_tag_pattern.matcher(line);
341 }
342 // We've found the close tag (hopefully) so add last bit (possibly
343 // empty string) to value
344 if (close_tag_matcher.matches())
345 {
346 String last_payload = close_tag_matcher.group(1);
347 if (line_buffer.length() > 0)
348 {
349 line_buffer.append("\n");
350 }
351 line_buffer.append(last_payload);
352 last_payload = null;
353 }
354 close_tag_matcher = null;
355
356 // Construct the compound key by which to sort the entries - note
357 // that src is a little different than the others, as we don't care
358 // about the timestamp at all - instead wanting to group the entries
359 // by src file path (key)
360 if (entry_type.equals("src"))
361 {
362 output_key = new Text(entry_type + " " + entry_key);
363 }
364 else
365 {
366 output_key = new Text(entry_type + " " + entry_time);
367 }
368 // Doc has its payload prefixed by key
369 if (entry_type.equals("doc"))
370 {
371 String encoded_xml = line_buffer.toString();
372 String decoded_xml = encoded_xml.replace("&quot;","\"");
373 decoded_xml = decoded_xml.replace("&apos;","'");
374 decoded_xml = decoded_xml.replace("&lt;","<");
375 decoded_xml = decoded_xml.replace("&gt;",">");
376 decoded_xml = decoded_xml.replace("&amp;","&");
377 output_value = new Text("[" + entry_key + "]\n" + decoded_xml);
378 }
379 else if (entry_type.equals("src"))
380 {
381 String encoded_xml = line_buffer.toString();
382 String decoded_xml = encoded_xml.replace("&quot;","\"");
383 decoded_xml = decoded_xml.replace("&apos;","'");
384 decoded_xml = decoded_xml.replace("&lt;","<");
385 decoded_xml = decoded_xml.replace("&gt;",">");
386 decoded_xml = decoded_xml.replace("&amp;","&");
387 output_value = new Text(entry_key + "|" + decoded_xml);
388 }
389 else if (entry_type.equals("rss"))
390 {
391 String encoded_xml = line_buffer.toString();
392 String decoded_xml = encoded_xml.replace("&quot;","\"");
393 decoded_xml = decoded_xml.replace("&apos;","'");
394 decoded_xml = decoded_xml.replace("&lt;","<");
395 decoded_xml = decoded_xml.replace("&gt;",">");
396 decoded_xml = decoded_xml.replace("&amp;","&");
397 output_value = new Text(decoded_xml);
398 }
399 else
400 {
401 output_value = new Text(line_buffer.toString());
402 }
403 entry_type = null;
404 entry_key = null;
405 entry_mode = null;
406 entry_time = null;
407 payload = null;
408 line_buffer = null;
409 }
410 // all other lines we'll key by host and time
411 else
412 {
413 double timestamp = ((double)System.currentTimeMillis())/1000;
414 String argument1 = hostname + ":" + cpu_number + ":" + String.format("%.6f", timestamp);
415 output_key = new Text("msg " + argument1);
416 output_value = new Text(argument1 + " " + line);
417 }
418
419 // Store the result in the return context
420 context.write(output_key, output_value);
421
422 // May as well do this here - indicate to the Hadoop framework that
423 // this process is still making progress (of some form)
424 context.progress();
425
426 // Cleanup
427 open_tag_matcher = null;
428 output_key = null;
429 output_value = null;
430 }
431 open_tag_pattern = null;
432 close_tag_pattern = null;
433 try
434 {
435 int import_status = import_process.waitFor();
436 if (import_status != 0)
437 {
438 throw new Exception("exit status: " + import_status);
439 }
440 }
441 catch (Exception e)
442 {
443 System.err.println("Error! Import command failed (" + e.toString() + ")");
444 }
445
446 // - stop the progress reporter as, one way or another, there will be no
447 // more progress
448 reporter.interrupt();
449 reporter = null; // force gc
450
451 // - write end time to log
452 FileWriter fw2 = new FileWriter(import_process_log, true);
453 double end_time = ((double)System.currentTimeMillis())/1000;
454 fw2.write("[Completed:" + String.format("%.6f", end_time) + "]\n");
455 fw2.close();
456 }
457 /** map(LongWritable,Text,Context) **/
458
459 }
460 /** class GSMap **/
461
462
463 /** @class GSReducer
464 */
465 public static class GSReducer
466 extends Reducer<Text, Text, Text, Text>
467 {
468
469
470 /** Prepare the Reducer by looking up configuration for this collection to
471 * determine the appropriate tools to use to create databases.
472 */
473 public void setup (Context context)
474 {
475 }
476 /** setup() **/
477
478
479 /**
480 */
481 public void reduce(Text key, Iterable<Text> values, Context context)
482 throws IOException, InterruptedException
483 {
484 System.err.println("reduce(" + key.toString() + ", <values>, <context>)");
485 Configuration conf = context.getConfiguration();
486 String gsdl_home = conf.get("gsdlhome");
487 String collection = conf.get("collection");
488 String archives_dir = gsdl_home + "/collect/" + collection + "/archives";
489 //String archives_dir = conf.get("archivesdir");
490 // Eventually I'd like to read in database type from collect.cfg - or
491 // maybe have it passed in as part of the context - but I'll hardcode
492 // as GDBM for now as a proof of concept
493 String infodbtype = "tdb";
494
495 String key_string = key.toString();
496 String[] key_parts = key_string.split(" ");
497 if (key_parts.length >= 2)
498 {
499 String type = key_parts[0];
500 String argument = key_parts[1];
501
502 Iterator<Text> values_itr = values.iterator();
503 // There are basically five different cases based on the key's type
504 // - the first is datestamp... these are sorted earliest first, and
505 // since we only want the earliest we can ignore the rest!
506 if (type.equals("datestamp"))
507 {
508 Text earliest_datestamp = values_itr.next();
509 // Write this directly to file
510 try
511 {
512 FileWriter earliest_datestamp_fout = new FileWriter(archives_dir + "/earliestDatestamp");
513 earliest_datestamp_fout.write(earliest_datestamp.toString());
514 earliest_datestamp_fout.close();
515 }
516 catch (Exception ex)
517 {
518 ex.printStackTrace();
519 }
520 }
521 // For 'doc' types we open a pipe to the database creator and send all
522 // the values through for processing
523 else if (type.equals("doc"))
524 {
525 GSInfoDB archiveinf_doc = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-doc." + infodbtype);
526 while (values_itr.hasNext())
527 {
528 Text db_entry = values_itr.next();
529 archiveinf_doc.writeEntry(db_entry.toString());
530 }
531 archiveinf_doc.close();
532 }
533 // Similarly for 'src' types, except here we group all entries with the
534 // same key
535 else if (type.equals("src"))
536 {
537 // Sigh - you can only create this the TDB file on a local
538 // filesystem, so I have to create it here, and then move it into
539 // place when finished
540 GSInfoDB archiveinf_src = new GSInfoDB(gsdl_home, infodbtype, archives_dir + "/archiveinf-src." + infodbtype);
541 String current_file_path = "";
542 StringBuffer current_record = new StringBuffer();
543 Pattern file_path_pattern = Pattern.compile("(.*?)\\|(.*)");
544 while (values_itr.hasNext())
545 {
546 Text db_entry_raw = values_itr.next();
547 String db_entry = db_entry_raw.toString();
548 // Parse out the file path this entry refers to
549 Matcher file_path_matcher = file_path_pattern.matcher(db_entry);
550 if (file_path_matcher.matches())
551 {
552 String this_file_path = file_path_matcher.group(1);
553 String this_record = file_path_matcher.group(2);
554 // Output the record (if there is one) if the file path changes
555 if (!this_file_path.equals(current_file_path) && current_record.length() > 0)
556 {
557 archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
558 // store the next records details
559 current_file_path = this_file_path;
560 current_record = new StringBuffer(this_record);
561 }
562 // Append onto our growing record
563 else
564 {
565 current_file_path = this_file_path;
566 if (current_record.length() > 0)
567 {
568 current_record.append("\n");
569 }
570 current_record.append(this_record);
571 }
572 }
573 else
574 {
575 // Not a valid src entry?
576 }
577 }
578 if (!current_file_path.equals("") && current_record.length() > 0)
579 {
580 archiveinf_src.writeEntry("[" + current_file_path + "]\n" + current_record.toString());
581 }
582 archiveinf_src.close();
583 }
584 // For 'rss' we write all the entries - in order - to an XML file
585 else if (type.equals("rss"))
586 {
587 try
588 {
589 FileWriter rss_item_rdf = new FileWriter(archives_dir + "/rss-items.rdf");
590 while (values_itr.hasNext())
591 {
592 Text rss_entry = values_itr.next();
593 rss_item_rdf.write(rss_entry.toString() + "\n");
594 }
595 rss_item_rdf.close();
596 }
597 catch (Exception ex)
598 {
599 ex.printStackTrace();
600 }
601 }
602 // Everything else we assume are just process log messages - so get
603 // Hadoop to write in order to log (I may need to annotate these
604 // with the host so I can see which message came from which compute
605 // node).
606 else
607 {
608 Pattern file_path_pattern = Pattern.compile("([^ ]+) (.*)");
609 while (values_itr.hasNext())
610 {
611 String compound_value = values_itr.next().toString();
612 Pattern p = Pattern.compile("([^\\s]+) (.*)");
613 Matcher m = p.matcher(compound_value);
614 if (m.matches())
615 {
616 Text msg_key = new Text(m.group(1));
617 Text msg_value = new Text(m.group(2));
618 context.write(msg_key, msg_value);
619 }
620 }
621 }
622 }
623 else
624 {
625 System.err.println("Error! Failed to parse key: " + key.toString());
626 }
627 }
628 /** reduce(key, value, context) **/
629
630 }
631 /** class GSReducer **/
632
633 /** @function main
634 */
635 public static void main(String[] args)
636 throws Exception
637 {
638 if (args.length < 6)
639 {
640 System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hadoop home> <collection> <archivesdir> <hdfsprefix> <hdfsin> <hdfsout>\n");
641 System.exit(0);
642 }
643
644 Configuration conf = new Configuration();
645 conf.set("gsdlhome", args[0]);
646 conf.set("hadoophome", args[1]);
647 conf.set("collection", args[2]);
648 conf.set("archivesdir", args[3]);
649 conf.set("hdfsprefix", args[4]); // "HDThriftFS", "HDFSShell", or ""
650 conf.set("hdfsin", args[5]);
651 conf.set("hdfsout", args[6]);
652
653 // Set the number of retries to 1 - hopefully one of the following will work
654 conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
655 conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
656 conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
657 // prevent timeouts
658 long milli_seconds = 4*60*60*1000; // 4 hour
659 conf.setLong("mapred.task.timeout", milli_seconds);
660
661 Job job = new Job(conf, "hadoopgreenstoneingest");
662 job.setJarByClass(HadoopGreenstoneIngest2.class);
663
664 job.setOutputKeyClass(Text.class);
665 job.setOutputValueClass(Text.class);
666
667 // Register the map, combiner, and reducer classes
668 job.setMapperClass(GSMap.class);
669 job.setPartitionerClass(GSPartitioner.class);
670 job.setGroupingComparatorClass(GSGroupingComparator.class);
671 job.setReducerClass(GSReducer.class);
672
673 // Sets the input and output handlers - may need to adjust input to provide me
674 // a series of filenames (TextInputFormat will instead read in a text file and
675 // return each line...)
676 job.setInputFormatClass(GSFileInputFormat.class);
677 //job.setOutputFormatClass(NullOutputFormat.class);
678 job.setOutputFormatClass(TextOutputFormat.class);
679
680 // Register the input and output paths
681 // - this input path should be to a file (in HDFS) that lists the paths to
682 // the manifest files
683 FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin")));
684 // - for now the output isn't that important, but in the future I may use
685 // this mechanism to produce a time based log.
686 FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout")));
687
688 // Recommended notation despite my hatiness of ?: syntax
689 System.exit(job.waitForCompletion(true)?0:1);
690 }
691 /** main(String[]) **/
692
693 /** @function copyFile(File, File)
694 *
695 * @author Josh Froelich @ stackoverflow.com
696 */
697 public static void copyFile(File sourceFile, File destFile)
698 throws IOException
699 {
700 if(!destFile.exists())
701 {
702 destFile.createNewFile();
703 }
704 FileChannel source = null;
705 FileChannel destination = null;
706 try
707 {
708 source = new FileInputStream(sourceFile).getChannel();
709 destination = new FileOutputStream(destFile).getChannel();
710 destination.transferFrom(source, 0, source.size());
711 }
712 finally
713 {
714 if(source != null)
715 {
716 source.close();
717 }
718 if(destination != null)
719 {
720 destination.close();
721 }
722 }
723 }
724 /** copyFile(File, File) **/
725
726
727 /** @function runCommand()
728 *
729 * A convenience method that calls an external command and returns its
730 * standard out as a string. Warning! Not safe if the command could return a
731 * large amount of text in the STDERR stream - may infinitely block.
732 *
733 */
734 public static String runCommand(String command)
735 {
736 ///ystem.err.println("[DEBUG] command: " + command);
737 StringBuffer result = new StringBuffer();
738 try
739 {
740 Runtime run = Runtime.getRuntime() ;
741 Process pr = run.exec(command) ;
742 pr.waitFor() ;
743 BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
744 String line;
745 while ( ( line = buf.readLine() ) != null )
746 {
747 result.append(line);
748 }
749 }
750 catch (Exception ex)
751 {
752 System.err.println("Error! " + ex.getMessage());
753 }
754 ///ystem.err.println("[DEBUG] result: " + result.toString());
755 return result.toString();
756 }
757 /** runCommand() **/
758}
Note: See TracBrowser for help on using the repository browser.