source: gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest.java@ 27654

Last change on this file since 27654 was 27654, checked in by jmt12, 11 years ago

Add the ability to stagger the starting of Mappers by placing a 'delay.me' file in the tmp directory of the compute node to delay. They will then be initially delayed by compute node number * 100 seconds

File size: 15.7 KB
Line 
1/** jmt12 **/
2package org.nzdl.gsdl;
3
4import java.io.BufferedOutputStream;
5import java.io.BufferedReader;
6import java.io.File;
7import java.io.FileOutputStream;
8import java.io.FileWriter;
9import java.io.InputStream;
10import java.io.InputStreamReader;
11import java.io.IOException;
12import java.io.PrintWriter;
13import java.lang.ProcessBuilder;
14import java.lang.ProcessBuilder.*;
15import java.lang.Thread;
16import java.net.InetAddress;
17import java.util.Map;
18import java.util.regex.Matcher;
19import java.util.regex.Pattern;
20
21import org.apache.hadoop.fs.Path;
22import org.apache.hadoop.conf.*;
23import org.apache.hadoop.io.*;
24import org.apache.hadoop.mapreduce.*;
25import org.apache.hadoop.mapreduce.Mapper.Context;
26import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
27import org.apache.hadoop.mapreduce.lib.input.FileSplit;
28import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
29import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
30import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
31import org.apache.hadoop.util.*;
32
33/** @class WordCount
34 */
35public class HadoopGreenstoneIngest
36{
37
38 /** @class GSFileRecordReader
39 */
40 public static class GSFileRecordReader
41 extends RecordReader<Text, IntWritable>
42 {
43 /** Uncompressed file name */
44 private Text current_key;
45
46 private IntWritable current_value = new IntWritable(1);
47
48 /** Used to indicate progress */
49 private boolean is_finished = false;
50
51 /**
52 */
53 @Override
54 public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
55 throws IOException, InterruptedException
56 {
57 FileSplit split = (FileSplit) inputSplit;
58 current_key = new Text(split.getPath().toString());
59 }
60 /** initialize() **/
61
62 /**
63 * We only ever have a single key/value
64 */
65 @Override
66 public boolean nextKeyValue()
67 throws IOException, InterruptedException
68 {
69 if (!is_finished)
70 {
71 is_finished = true;
72 return true;
73 }
74 return false;
75 }
76 /** nextKeyValue() **/
77
78 /** @function getProgress
79 * Rather than calculating progress, we just keep it simple
80 */
81 @Override
82 public float getProgress()
83 throws IOException, InterruptedException
84 {
85 return is_finished ? 1 : 0;
86 }
87 /** getProgress() **/
88
89 /**
90 * Returns the current key (name of the zipped file)
91 */
92 @Override
93 public Text getCurrentKey()
94 throws IOException, InterruptedException
95 {
96 return current_key;
97 }
98 /** getCurrentKey() **/
99
100 /**
101 * Returns the current value (contents of the zipped file)
102 */
103 @Override
104 public IntWritable getCurrentValue()
105 throws IOException, InterruptedException
106 {
107 return current_value;
108 }
109 /** getCurrentValue() **/
110
111 /**
112 * Close quietly, ignoring any exceptions
113 */
114 @Override
115 public void close()
116 throws IOException
117 {
118 // nothing to do
119 }
120 /** close() **/
121
122 }
123 /** GSFileRecordReader **/
124
125 /** @class GSFileInputFormat
126 */
127 public static class GSFileInputFormat
128 extends FileInputFormat<Text, IntWritable>
129 {
130 /**
131 * Don't split the files
132 */
133 @Override
134 protected boolean isSplitable(JobContext context, Path filename)
135 {
136 return false;
137 }
138 /** isSplitable() **/
139
140 /**
141 */
142 @Override
143 public RecordReader<Text, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext content)
144 throws IOException, InterruptedException
145 {
146 return new GSFileRecordReader();
147 }
148 /** createRecordReader() **/
149
150 }
151 /** class GSFileInputFormat **/
152
153 /** @class GSMap
154 */
155 public static class GSMap
156 extends Mapper<Text, IntWritable, Text, IntWritable>
157 {
158 /** @function map
159 * The key is the full path (HDFS) of the file to be processed.
160 */
161 public void map(Text key, IntWritable value, Context context)
162 throws IOException, InterruptedException
163 {
164 String file_path = key.toString();
165 // - configuration for the task
166 Configuration conf = context.getConfiguration();
167 String gsdlhome = conf.get("gsdlhome");
168 String hdfs_prefix = conf.get("hdfsprefix");
169 String hadoop_home = conf.get("hadoophome");
170 String collection = conf.get("collection");
171 String task_id = conf.get("mapred.task.id");
172 task_id = task_id.substring(8); // remove "attempt_" prefix
173
174 // Programatically rewrite the protocol as appropriate for the given
175 // archives directory (not necessary if path is local or NFS)
176 if (hdfs_prefix.equals("/hdfs"))
177 {
178 file_path = file_path.replaceFirst("hdfs://[^/]*", hdfs_prefix);
179 }
180 else
181 {
182 file_path = file_path.replace("hdfs://", hdfs_prefix);
183 }
184
185 // - create a temporary directory
186 File greenstone_tmp_dir = new File("/tmp/greenstone");
187 if (!greenstone_tmp_dir.isDirectory())
188 {
189 greenstone_tmp_dir.mkdir();
190 }
191
192 // - open a unique log file
193 File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
194 FileWriter fw1 = new FileWriter(import_process_log, true);
195 // MEDUSA Customization: Introduce a slight delay based upon the hostname
196 // in order to stagger the startup of Map workers. It looks like the avg
197 // IO is around 25 minutes... so lets try to make it so the last mapper
198 // starts up 25 minutes after the first (with all others spread in
199 // between).
200 String hostname = InetAddress.getLocalHost().getHostName();
201 // We only do this if there is a sentinel file lurking in tmp
202 try
203 {
204 File delay_file = new File("/tmp/greenstone/delay.me");
205 if (delay_file.exists())
206 {
207 Pattern p = Pattern.compile("compute-0-([0-9]+).local");
208 Matcher m = p.matcher(hostname);
209 if (m.matches())
210 {
211 String node_str = m.group(1);
212 int node_number = Integer.parseInt(node_str) * 100;
213 fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
214 Thread.currentThread().sleep(1000 * node_number);
215 }
216 // We only do this once for each compute node
217 delay_file.delete();
218 }
219 }
220 catch (Exception ie)
221 {
222 System.err.println(ie.toString());
223 }
224
225 // - start the log by writing the time and the manifest line
226 long start_time = System.currentTimeMillis()/1000;
227 StringBuffer header_block = new StringBuffer("[Started:");
228 header_block.append(start_time);
229 header_block.append("]\n[Host:");
230 header_block.append(hostname);
231 header_block.append("]\n[CPU:");
232 String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
233 File getcpu_executable = new File(getcpu_executable_cmd);
234 if (getcpu_executable.exists())
235 {
236 header_block.append(runCommand(getcpu_executable_cmd));
237 }
238 else
239 {
240 header_block.append("0");
241 }
242 header_block.append("]\n[Task:");
243 header_block.append(task_id);
244 header_block.append("]\n[Map:");
245 header_block.append(file_path);
246 header_block.append("=>");
247 header_block.append(value);
248 header_block.append("]\n");
249 fw1.write(header_block.toString());
250 header_block = null;
251
252 // - create a temporary manifest file to process this file. Overwrite any
253 // existing file
254 File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
255 FileWriter manifest_writer = new FileWriter(manifest_path);
256 manifest_writer.write("<Manifest version=\"2.0\">\n");
257 manifest_writer.write("\t<Index>\n");
258 manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
259 manifest_writer.write("\t</Index>\n");
260 manifest_writer.write("</Manifest>\n");
261 manifest_writer.close();
262
263 // - call Greenstone passing in the path to the manifest
264 ProcessBuilder import_process_builder = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-archivedir", conf.get("archivesdir"), collection);
265 fw1.write("[Command:" + import_process_builder.command() + "]\n");
266 // - alter environment
267 Map<String, String> import_process_env = import_process_builder.environment();
268 // - path
269 String path = import_process_env.get("PATH");
270 path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
271 path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
272 path = hadoop_home + "/bin:" + path;
273 path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
274 path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
275 path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
276 path = gsdlhome + "/bin/script:" + path;
277 path = gsdlhome + "/bin/linux:" + path;
278 import_process_env.put("PATH", path);
279 fw1.write("[PATH: " + path + "]\n");
280 // - ld_library_path
281 import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
282 // - dyld_library_path
283 import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
284 // - misc
285 import_process_env.put("GSDLHOME", gsdlhome);
286 import_process_env.put("GSDLOS", "linux");
287 import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
288 // - installed extension paths
289 import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
290 import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
291 import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
292 import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
293 // - Hadoop specific
294 import_process_env.put("HADOOP_PREFIX", hadoop_home);
295 fw1.write("[HADOOP_PREFIX: " + hadoop_home + "]\n");
296
297 // - change working directory
298 import_process_builder.directory(new File(gsdlhome));
299 // - close our output to the log before opening in the process
300 fw1.close();
301
302 // - write output to log
303 import_process_builder.redirectErrorStream(true);
304 import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));
305
306 // - create progress reporter (so Hadoop doesn't time us out)
307 Thread reporter = new HadoopProgressReporter(context, import_process_log);
308 reporter.start();
309
310 // - run process
311 Process import_process = import_process_builder.start();
312 try
313 {
314 int import_status = import_process.waitFor();
315 if (import_status != 0)
316 {
317 throw new Exception("exit status: " + import_status);
318 }
319 }
320 catch (Exception e)
321 {
322 System.err.println("Error! Import command failed (" + e.toString() + ")");
323 }
324
325 // - stop the progress reporter as, one way or another, there will be no
326 // more progress
327 reporter.interrupt();
328 reporter = null; // force gc
329
330 // - write end time to log
331 FileWriter fw2 = new FileWriter(import_process_log, true);
332 long end_time = System.currentTimeMillis()/1000;
333 fw2.write("[Completed:" + end_time + "]\n");
334 fw2.close();
335
336 // - for now return a dummy output. In the future I may want to parse the
337 // output from Greenstone as output and allow reducing to make me a
338 // pretty timebased log
339 context.write(key, value);
340 }
341 /** map(LongWritable,Text,Context) **/
342
343 }
344 /** class GSMap **/
345
346 /** @function main
347 */
348 public static void main(String[] args)
349 throws Exception
350 {
351 if (args.length < 6)
352 {
353 System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hadoop home> <collection> <archivesdir> <hdfsprefix> <hdfsin> <hdfsout>\n");
354 System.exit(0);
355 }
356
357 Configuration conf = new Configuration();
358 conf.set("gsdlhome", args[0]);
359 conf.set("hadoophome", args[1]);
360 conf.set("collection", args[2]);
361 conf.set("archivesdir", args[3]);
362 conf.set("hdfsprefix", args[4]); // "HDThriftFS", "HDFSShell", or ""
363 conf.set("hdfsin", args[5]);
364 conf.set("hdfsout", args[6]);
365
366 // Set the number of retries to 1 - hopefully one of the following will work
367 conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
368 conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
369 conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
370 // prevent timeouts
371 long milli_seconds = 4*60*60*1000; // 4 hour
372 conf.setLong("mapred.task.timeout", milli_seconds);
373 Job job = new Job(conf, "hadoopgreenstoneingest");
374 job.setJarByClass(HadoopGreenstoneIngest.class);
375
376 job.setOutputKeyClass(Text.class);
377 job.setOutputValueClass(IntWritable.class);
378
379 // Register the map, combiner, and reducer classes
380 job.setMapperClass(GSMap.class);
381 // - in theory, uses the IdentityReducer by default, which simply returns
382 // the input as the output (so no processing)
383 job.setNumReduceTasks(0);
384
385 // Sets the input and output handlers - may need to adjust input to provide me
386 // a series of filenames (TextInputFormat will instead read in a text file and
387 // return each line...)
388 job.setInputFormatClass(GSFileInputFormat.class);
389 job.setOutputFormatClass(NullOutputFormat.class);
390 //job.setOutputFormatClass(TextOutputFormat.class);
391
392 // Register the input and output paths
393 // - this input path should be to a file (in HDFS) that lists the paths to
394 // the manifest files
395 FileInputFormat.setInputPaths(job, new Path(conf.get("hdfsin")));
396 // - for now the output isn't that important, but in the future I may use
397 // this mechanism to produce a time based log.
398 FileOutputFormat.setOutputPath(job, new Path(conf.get("hdfsout")));
399
400 // Recommended notation despite my hatiness of ?: syntax
401 System.exit(job.waitForCompletion(true)?0:1);
402 }
403 /** main(String[]) **/
404
405 /** @function runCommand()
406 *
407 * A convenience method that calls an external command and returns its
408 * standard out as a string. Warning! Not safe if the command could return a
409 * large amount of text in the STDERR stream - may infinitely block.
410 *
411 */
412 public static String runCommand(String command)
413 {
414 StringBuffer result = new StringBuffer();
415 try
416 {
417 Runtime run = Runtime.getRuntime() ;
418 Process pr = run.exec(command) ;
419 pr.waitFor() ;
420 BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
421 String line;
422 while ( ( line = buf.readLine() ) != null )
423 {
424 result.append(line);
425 }
426 }
427 catch (Exception ex)
428 {
429 System.err.println("Error! " + ex.getMessage());
430 }
431 return result.toString();
432 }
433 /** runCommand() **/
434}
435
436class HadoopProgressReporter
437extends Thread
438{
439
440 private Context hadoop_process;
441
442 private File log_file;
443
444 HadoopProgressReporter(Context hadoop_process, File log_file)
445 {
446 this.hadoop_process = hadoop_process;
447 //this.log_file = log_file;
448 this.log_file = new File("/tmp/hadoop_progress_reporter.log");
449 }
450
451 public void run()
452 {
453 try
454 {
455 while (!this.isInterrupted())
456 {
457 sleep(60000); // Wait a minute
458 //FileWriter fw1 = new FileWriter(this.log_file, true);
459 //long time = System.currentTimeMillis()/1000;
460 //fw1.write("[" + time + "] HadoopProgressReporter.progress()\n");
461 //fw1.close();
462 this.hadoop_process.progress(); // Inform Hadoop we are still processing
463 }
464 }
465 catch (InterruptedException iex)
466 {
467 // We've been interrupted: no more progress
468 }
469 catch (Exception ex)
470 {
471 ex.printStackTrace();
472 }
473 }
474}
Note: See TracBrowser for help on using the repository browser.