source: gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest.java@ 30354

Last change on this file since 30354 was 30354, checked in by jmt12, 8 years ago

Extending manifest v2 support to allow for directories to be listed in manifest. Matched with changes in Directory plugin to allow paths into systems like HDFS to be listed in manifest.cd

File size: 16.3 KB
Line 
1/** jmt12 **/
2package org.nzdl.gsdl;
3
4import java.io.BufferedOutputStream;
5import java.io.BufferedReader;
6import java.io.File;
7import java.io.FileOutputStream;
8import java.io.FileWriter;
9import java.io.InputStream;
10import java.io.InputStreamReader;
11import java.io.IOException;
12import java.io.PrintWriter;
13import java.lang.ProcessBuilder;
14import java.lang.ProcessBuilder.*;
15import java.lang.Thread;
16import java.net.InetAddress;
17import java.util.Map;
18import java.util.regex.Matcher;
19import java.util.regex.Pattern;
20
21import org.apache.hadoop.fs.Path;
22import org.apache.hadoop.conf.*;
23import org.apache.hadoop.io.*;
24import org.apache.hadoop.mapreduce.*;
25import org.apache.hadoop.mapreduce.Mapper.Context;
26import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
27import org.apache.hadoop.mapreduce.lib.input.FileSplit;
28import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
29import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
30import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
31import org.apache.hadoop.util.*;
32
33/** @class WordCount
34 */
35public class HadoopGreenstoneIngest
36{
37
38 /** @class GSFileRecordReader
39 */
40 public static class GSFileRecordReader
41 extends RecordReader<Text, IntWritable>
42 {
43 /** Uncompressed file name */
44 private Text current_key;
45
46 private IntWritable current_value = new IntWritable(1);
47
48 /** Used to indicate progress */
49 private boolean is_finished = false;
50
51 /**
52 */
53 @Override
54 public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
55 throws IOException, InterruptedException
56 {
57 FileSplit split = (FileSplit) inputSplit;
58 current_key = new Text(split.getPath().toString());
59 }
60 /** initialize() **/
61
62 /**
63 * We only ever have a single key/value
64 */
65 @Override
66 public boolean nextKeyValue()
67 throws IOException, InterruptedException
68 {
69 if (!is_finished)
70 {
71 is_finished = true;
72 return true;
73 }
74 return false;
75 }
76 /** nextKeyValue() **/
77
78 /** @function getProgress
79 * Rather than calculating progress, we just keep it simple
80 */
81 @Override
82 public float getProgress()
83 throws IOException, InterruptedException
84 {
85 return is_finished ? 1 : 0;
86 }
87 /** getProgress() **/
88
89 /**
90 * Returns the current key (name of the zipped file)
91 */
92 @Override
93 public Text getCurrentKey()
94 throws IOException, InterruptedException
95 {
96 return current_key;
97 }
98 /** getCurrentKey() **/
99
100 /**
101 * Returns the current value (contents of the zipped file)
102 */
103 @Override
104 public IntWritable getCurrentValue()
105 throws IOException, InterruptedException
106 {
107 return current_value;
108 }
109 /** getCurrentValue() **/
110
111 /**
112 * Close quietly, ignoring any exceptions
113 */
114 @Override
115 public void close()
116 throws IOException
117 {
118 // nothing to do
119 }
120 /** close() **/
121
122 }
123 /** GSFileRecordReader **/
124
125 /** @class GSFileInputFormat
126 */
127 public static class GSFileInputFormat
128 extends FileInputFormat<Text, IntWritable>
129 {
130 /**
131 * Don't split the files
132 */
133 @Override
134 protected boolean isSplitable(JobContext context, Path filename)
135 {
136 return false;
137 }
138 /** isSplitable() **/
139
140 /**
141 */
142 @Override
143 public RecordReader<Text, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext content)
144 throws IOException, InterruptedException
145 {
146 return new GSFileRecordReader();
147 }
148 /** createRecordReader() **/
149
150 }
151 /** class GSFileInputFormat **/
152
153 /** @class GSMap
154 */
155 public static class GSMap
156 extends Mapper<Text, IntWritable, Text, IntWritable>
157 {
158 /** @function map
159 * The key is the full path (HDFS) of the file to be processed.
160 */
161 public void map(Text key, IntWritable value, Context context)
162 throws IOException, InterruptedException
163 {
164 String file_path = key.toString();
165 // - configuration for the task
166 Configuration conf = context.getConfiguration();
167 String gsdlhome = conf.get("gsdlhome");
168 String hdfs_prefix = conf.get("hdfsprefix");
169 String hadoop_home = conf.get("hadoophome");
170 String collection = conf.get("collection");
171 String task_id = conf.get("mapred.task.id");
172 task_id = task_id.substring(8); // remove "attempt_" prefix
173
174 // Programatically rewrite the protocol as appropriate for the given
175 // archives directory (not necessary if path is local or NFS)
176 if (hdfs_prefix.equals("/hdfs"))
177 {
178 file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix);
179 }
180 else
181 {
182 file_path = file_path.replace("hdfs://", hdfs_prefix);
183 }
184
185 // - create a temporary directory
186 File greenstone_tmp_dir = new File("/tmp/greenstone");
187 if (!greenstone_tmp_dir.isDirectory())
188 {
189 greenstone_tmp_dir.mkdir();
190 }
191
192 // - open a unique log file
193 File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
194 FileWriter fw1 = new FileWriter(import_process_log, true);
195 // MEDUSA Customization: Introduce a slight delay based upon the hostname
196 // in order to stagger the startup of Map workers. It looks like the avg
197 // IO is around 25 minutes... so lets try to make it so the last mapper
198 // starts up 25 minutes after the first (with all others spread in
199 // between).
200 String hostname = InetAddress.getLocalHost().getHostName();
201 // We only do this if there is a sentinel file lurking in tmp
202 try
203 {
204 File delay_file = new File("/tmp/greenstone/delay.me");
205 if (delay_file.exists())
206 {
207 Pattern p = Pattern.compile("compute-0-([0-9]+).local");
208 Matcher m = p.matcher(hostname);
209 if (m.matches())
210 {
211 String node_str = m.group(1);
212 int node_number = Integer.parseInt(node_str) * 100;
213 fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
214 Thread.currentThread().sleep(1000 * node_number);
215 }
216 // We only do this once for each compute node
217 delay_file.delete();
218 }
219 }
220 catch (Exception ie)
221 {
222 System.err.println(ie.toString());
223 }
224
225 // - start the log by writing the time and the manifest line
226 long start_time = System.currentTimeMillis()/1000;
227 StringBuffer header_block = new StringBuffer("[Started:");
228 header_block.append(start_time);
229 header_block.append("]\n[Host:");
230 header_block.append(hostname);
231 header_block.append("]\n[CPU:");
232 String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
233 File getcpu_executable = new File(getcpu_executable_cmd);
234 if (getcpu_executable.exists())
235 {
236 header_block.append(runCommand(getcpu_executable_cmd));
237 }
238 else
239 {
240 header_block.append("0");
241 }
242 header_block.append("]\n[Task:");
243 header_block.append(task_id);
244 header_block.append("]\n[Map:");
245 header_block.append(file_path);
246 header_block.append("=>");
247 header_block.append(value);
248 header_block.append("]\n");
249 fw1.write(header_block.toString());
250 header_block = null;
251
252 // - create a temporary manifest file to process this file. Overwrite any
253 // existing file
254 File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
255 FileWriter manifest_writer = new FileWriter(manifest_path);
256 manifest_writer.write("<Manifest version=\"2.0\">\n");
257 manifest_writer.write("\t<Index>\n");
258 manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
259 manifest_writer.write("\t</Index>\n");
260 manifest_writer.write("</Manifest>\n");
261 manifest_writer.close();
262
263 // - call Greenstone passing in the path to the manifest
264 //ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection);
265 String environment_script_filename = "setup.bash";
266 StringBuffer cmd_buffer = new StringBuffer();
267 cmd_buffer.append("source ./");
268 cmd_buffer.append(environment_script_filename);
269 cmd_buffer.append(" && time -p import.pl -keepold -manifest \"");
270 cmd_buffer.append(manifest_path.toString());
271 cmd_buffer.append("\" -archivedir \"");
272 cmd_buffer.append(conf.get("archivesdir"));
273 cmd_buffer.append("\" ");
274 cmd_buffer.append(collection);
275 ProcessBuilder import_process_builder = new ProcessBuilder("bash", "-c", cmd_buffer.toString());
276 fw1.write("[Command:" + import_process_builder.command() + "]\n");
277 /*
278 // - alter environment
279 Map<String, String> import_process_env = import_process_builder.environment();
280 // - build up the path
281 String path = import_process_env.get("PATH");
282 path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
283 path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
284 path = hadoop_home + "/bin:" + path;
285 path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
286 path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
287 path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
288 path = gsdlhome + "/bin/script:" + path;
289 path = gsdlhome + "/bin/linux:" + path;
290 import_process_env.put("PATH", path);
291 fw1.write("[PATH: " + path + "]\n");
292 // - ld_library_path
293 import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
294 // - dyld_library_path
295 import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
296 // - misc
297 import_process_env.put("GSDLHOME", gsdlhome);
298 import_process_env.put("GSDLOS", "linux");
299 import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
300 // - installed extension paths
301 import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
302 import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
303 import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
304 import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
305 // - Hadoop specific
306 import_process_env.put("HADOOP_PREFIX", hadoop_home);
307 fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n");
308 */
309
310 // - change working directory
311 import_process_builder.directory(new File(gsdlhome));
312 // - close our output to the log before opening in the process
313 fw1.close();
314
315 // - write output to log
316 import_process_builder.redirectErrorStream(true);
317 import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));
318
319 // - create progress reporter (so Hadoop doesn't time us out)
320 Thread reporter = new HadoopProgressReporter(context, import_process_log);
321 reporter.start();
322
323 // - run process
324 Process import_process = import_process_builder.start();
325 try
326 {
327 int import_status = import_process.waitFor();
328 if (import_status != 0)
329 {
330 throw new Exception("exit status: " + import_status);
331 }
332 }
333 catch (Exception e)
334 {
335 System.err.println("Error! Import command failed (" + e.toString() + ")");
336 }
337
338 // - stop the progress reporter as, one way or another, there will be no
339 // more progress
340 reporter.interrupt();
341 reporter = null; // force gc
342
343 // - write end time to log
344 FileWriter fw2 = new FileWriter(import_process_log, true);
345 long end_time = System.currentTimeMillis()/1000;
346 fw2.write("[Completed:" + end_time + "]\n");
347 fw2.close();
348
349 // - for now return a dummy output. In the future I may want to parse the
350 // output from Greenstone as output and allow reducing to make me a
351 // pretty timebased log
352 context.write(key, value);
353 }
354 /** map(LongWritable,Text,Context) **/
355
356 }
357 /** class GSMap **/
358
359 /** @function main
360 */
361 public static void main(String[] args)
362 throws Exception
363 {
364 if (args.length < 6)
365 {
366 System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hadoop home> <collection> <archivesdir> <hdfsprefix> <hdfsin> <hdfsout>\n");
367 System.exit(0);
368 }
369
370 Configuration conf = new Configuration();
371 conf.set("gsdlhome", args[0]);
372 conf.set("hadoophome", args[1]);
373 conf.set("collection", args[2]);
374 conf.set("archivesdir", args[3]);
375 conf.set("hdfsprefix", args[4]); // "HDThriftFS", "HDFSShell", or ""
376 conf.set("hdfsin", args[5]);
377 conf.set("hdfsout", args[6]);
378
379 // Set the number of retries to 1 - hopefully one of the following will work
380 conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
381 conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
382 conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
383 // prevent timeouts
384 long milli_seconds = 4*60*60*1000; // 4 hour
385 conf.setLong("mapred.task.timeout", milli_seconds);
386 Job job = new Job(conf, "hadoopgreenstoneingest");
387 job.setJarByClass(HadoopGreenstoneIngest.class);
388
389 job.setOutputKeyClass(Text.class);
390 job.setOutputValueClass(IntWritable.class);
391
392 // Register the map, combiner, and reducer classes
393 job.setMapperClass(GSMap.class);
394 // - in theory, uses the IdentityReducer by default, which simply returns
395 // the input as the output (so no processing)
396 job.setNumReduceTasks(0);
397
398 // Sets the input and output handlers - may need to adjust input to provide me
399 // a series of filenames (TextInputFormat will instead read in a text file and
400 // return each line...)
401 job.setInputFormatClass(GSFileInputFormat.class);
402 job.setOutputFormatClass(NullOutputFormat.class);
403 //job.setOutputFormatClass(TextOutputFormat.class);
404
405 // Register the input and output paths
406 // - this input path should be to a file (in HDFS) that lists the paths to
407 // the manifest files
408 FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin")));
409 // - for now the output isn't that important, but in the future I may use
410 // this mechanism to produce a time based log.
411 FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout")));
412
413 // Recommended notation despite my hatiness of ?: syntax
414 System.exit(job.waitForCompletion(true)?0:1);
415 }
416 /** main(String[]) **/
417
418 /** @function runCommand()
419 *
420 * A convenience method that calls an external command and returns its
421 * standard out as a string. Warning! Not safe if the command could return a
422 * large amount of text in the STDERR stream - may infinitely block.
423 *
424 */
425 public static String runCommand(String command)
426 {
427 StringBuffer result = new StringBuffer();
428 try
429 {
430 Runtime run = Runtime.getRuntime() ;
431 Process pr = run.exec(command) ;
432 pr.waitFor() ;
433 BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
434 String line;
435 while ( ( line = buf.readLine() ) != null )
436 {
437 result.append(line);
438 }
439 }
440 catch (Exception ex)
441 {
442 System.err.println("Error! " + ex.getMessage());
443 }
444 return result.toString();
445 }
446 /** runCommand() **/
447}
448
449class HadoopProgressReporter
450extends Thread
451{
452
453 private Context hadoop_process;
454
455 private File log_file;
456
457 HadoopProgressReporter(Context hadoop_process, File log_file)
458 {
459 this.hadoop_process = hadoop_process;
460 //this.log_file = log_file;
461 this.log_file = new File("/tmp/hadoop_progress_reporter.log");
462 }
463
464 public void run()
465 {
466 try
467 {
468 while (!this.isInterrupted())
469 {
470 sleep(60000); // Wait a minute
471 //FileWriter fw1 = new FileWriter(this.log_file, true);
472 //long time = System.currentTimeMillis()/1000;
473 //fw1.write("[" + time + "] HadoopProgressReporter.progress()\n");
474 //fw1.close();
475 this.hadoop_process.progress(); // Inform Hadoop we are still processing
476 }
477 }
478 catch (InterruptedException iex)
479 {
480 // We've been interrupted: no more progress
481 }
482 catch (Exception ex)
483 {
484 ex.printStackTrace();
485 }
486 }
487}
Note: See TracBrowser for help on using the repository browser.