source: gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest.java@ 27546

Last change on this file since 27546 was 27546, checked in by jmt12, 11 years ago

Adding the ability for the Hadoop Mapper to determine what CPU number it is running on (at that moment - best I can do) - this is useful for multicore machines as otherwise there is no way to tell the 'worker processes' apart

File size: 13.9 KB
Line 
1/** jmt12 **/
2package org.nzdl.gsdl;
3
4import java.io.BufferedOutputStream;
5import java.io.BufferedReader;
6import java.io.File;
7import java.io.FileOutputStream;
8import java.io.FileWriter;
9import java.io.InputStream;
10import java.io.InputStreamReader;
11import java.io.IOException;
12import java.io.PrintWriter;
13import java.lang.ProcessBuilder;
14import java.lang.ProcessBuilder.*;
15import java.lang.Thread;
16import java.net.InetAddress;
17import java.util.Map;
18
19import org.apache.hadoop.fs.Path;
20import org.apache.hadoop.conf.*;
21import org.apache.hadoop.io.*;
22import org.apache.hadoop.mapreduce.*;
23import org.apache.hadoop.mapreduce.Mapper.Context;
24import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
25import org.apache.hadoop.mapreduce.lib.input.FileSplit;
26import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
27import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
28import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
29import org.apache.hadoop.util.*;
30
31/** @class WordCount
32 */
33public class HadoopGreenstoneIngest
34{
35
36 /** @class GSFileRecordReader
37 */
38 public static class GSFileRecordReader
39 extends RecordReader<Text, IntWritable>
40 {
41 /** Uncompressed file name */
42 private Text current_key;
43
44 private IntWritable current_value = new IntWritable(1);
45
46 /** Used to indicate progress */
47 private boolean is_finished = false;
48
49 /**
50 */
51 @Override
52 public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext )
53 throws IOException, InterruptedException
54 {
55 FileSplit split = (FileSplit) inputSplit;
56 current_key = new Text(split.getPath().toString());
57 }
58 /** initialize() **/
59
60 /**
61 * We only ever have a single key/value
62 */
63 @Override
64 public boolean nextKeyValue()
65 throws IOException, InterruptedException
66 {
67 if (!is_finished)
68 {
69 is_finished = true;
70 return true;
71 }
72 return false;
73 }
74 /** nextKeyValue() **/
75
76 /** @function getProgress
77 * Rather than calculating progress, we just keep it simple
78 */
79 @Override
80 public float getProgress()
81 throws IOException, InterruptedException
82 {
83 return is_finished ? 1 : 0;
84 }
85 /** getProgress() **/
86
87 /**
88 * Returns the current key (name of the zipped file)
89 */
90 @Override
91 public Text getCurrentKey()
92 throws IOException, InterruptedException
93 {
94 return current_key;
95 }
96 /** getCurrentKey() **/
97
98 /**
99 * Returns the current value (contents of the zipped file)
100 */
101 @Override
102 public IntWritable getCurrentValue()
103 throws IOException, InterruptedException
104 {
105 return current_value;
106 }
107 /** getCurrentValue() **/
108
109 /**
110 * Close quietly, ignoring any exceptions
111 */
112 @Override
113 public void close()
114 throws IOException
115 {
116 // nothing to do
117 }
118 /** close() **/
119
120 }
121 /** GSFileRecordReader **/
122
123 /** @class GSFileInputFormat
124 */
125 public static class GSFileInputFormat
126 extends FileInputFormat<Text, IntWritable>
127 {
128 /**
129 * Don't split the files
130 */
131 @Override
132 protected boolean isSplitable(JobContext context, Path filename)
133 {
134 return false;
135 }
136 /** isSplitable() **/
137
138 /**
139 */
140 @Override
141 public RecordReader<Text, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext content)
142 throws IOException, InterruptedException
143 {
144 return new GSFileRecordReader();
145 }
146 /** createRecordReader() **/
147
148 }
149 /** class GSFileInputFormat **/
150
151 /** @class GSMap
152 */
153 public static class GSMap
154 extends Mapper<Text, IntWritable, Text, IntWritable>
155 {
156 /** @function map
157 * The key is the full path (HDFS) of the file to be processed.
158 */
159 public void map(Text key, IntWritable value, Context context)
160 throws IOException, InterruptedException
161 {
162 String file_path = key.toString();
163 // - configuration for the task
164 Configuration conf = context.getConfiguration();
165 String gsdlhome = conf.get("gsdlhome");
166 String hdfs_prefix = conf.get("hdfsprefix");
167 String hadoop_prefix = conf.get("hadoopprefix");
168 String collection = conf.get("collection");
169 String task_id = conf.get("mapred.task.id");
170 task_id = task_id.substring(8); // remove "attempt_" prefix
171 // Programatically rewrite the protocol as appropriate for the given
172 // archives directory
173 file_path = file_path.replace("hdfs://", hdfs_prefix);
174 // - create a temporary directory
175 File greenstone_tmp_dir = new File("/tmp/greenstone");
176 if (!greenstone_tmp_dir.isDirectory())
177 {
178 greenstone_tmp_dir.mkdir();
179 }
180 // - open a unique log file
181 File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
182 // - start the log by writing the time and the manifest line
183 FileWriter fw1 = new FileWriter(import_process_log, true);
184 long start_time = System.currentTimeMillis()/1000;
185 StringBuffer header_block = new StringBuffer("[Started:");
186 header_block.append(start_time);
187 header_block.append("]\n[Host:");
188 header_block.append(InetAddress.getLocalHost().getHostName());
189 header_block.append("]\n[CPU:");
190 header_block.append(runCommand("getcpu"));
191 header_block.append("]\n[Task:");
192 header_block.append(task_id);
193 header_block.append("]\n[Map:");
194 header_block.append(file_path);
195 header_block.append("=>");
196 header_block.append(value);
197 header_block.append("]\n");
198 fw1.write(header_block.toString());
199 header_block = null;
200
201 // - create a temporary manifest file to process this file. Overwrite any
202 // existing file
203 File manifest_path = new File("/tmp/greenstone/manifest" + task_id + ".xml");
204 FileWriter manifest_writer = new FileWriter(manifest_path);
205 manifest_writer.write("<Manifest version=\"2.0\">\n");
206 manifest_writer.write("\t<Index>\n");
207 manifest_writer.write("\t\t<Filename>" + file_path + "</Filename>\n");
208 manifest_writer.write("\t</Index>\n");
209 manifest_writer.write("</Manifest>\n");
210 manifest_writer.close();
211
212 // - call Greenstone passing in the path to the manifest
213 ProcessBuilder import_process_builder
214 = new ProcessBuilder("time", "-p", "import.pl", "-manifest", manifest_path.toString(), "-keepold", "-verbosity", "42", "-archivedir", hdfs_prefix + "/user/jmt12/gsdl/collect/" + collection + "/archives", collection);
215 fw1.write("[Command:" + import_process_builder.command() + "]\n");
216 // - alter environment
217 Map<String, String> import_process_env = import_process_builder.environment();
218 // - path
219 String path = import_process_env.get("PATH");
220 path = gsdlhome + "/ext/parallel-building/bin/script:" + path;
221 path = gsdlhome + "/ext/parallel-building/linux/bin:" + path;
222 path = hadoop_prefix + "/bin:" + path;
223 path = gsdlhome + "/ext/tdb-edit/linux/bin:" + path;
224 path = gsdlhome + "/ext/tdb-edit/bin/script:" + path;
225 path = gsdlhome + "/ext/video-and-audio/linux/bin:" + path;
226 path = gsdlhome + "/bin/script:" + path;
227 path = gsdlhome + "/bin/linux:" + path;
228 import_process_env.put("PATH", path);
229 fw1.write("[PATH: " + path + "]\n");
230 // - ld_library_path
231 import_process_env.put("LD_LIBRARY_PATH", gsdlhome + "/ext/parallel-building/linux/lib:" + gsdlhome + "/ext/hadoop/linux/lib:" + gsdlhome + "/ext/video-and-audio/linux/lib:" + gsdlhome + "/ext/tdb-edit/linux/lib");
232 // - dyld_library_path
233 import_process_env.put("DYLD_LIBRARY_PATH", gsdlhome + "/ext/video-and-audio/linux/lib");
234 // - misc
235 import_process_env.put("GSDLHOME", gsdlhome);
236 import_process_env.put("GSDLOS", "linux");
237 import_process_env.put("GSDLEXTS", "parallel-building:tdb-edit:video-and-audio");
238 // - installed extension paths
239 import_process_env.put("GEXTPARALLELBUILDING", gsdlhome + "/ext/parallel-building");
240 import_process_env.put("GEXTPARALLELBUILDING_INSTALLED", gsdlhome + "/ext/parallel-building/linux");
241 import_process_env.put("GEXTTDBEDIT_INSTALLED", gsdlhome + "/ext/tdb-edit/linux");
242 import_process_env.put("GEXTVIDEO_INSTALLED", gsdlhome + "/ext/video-and-audio/linux");
243 // - Hadoop specific
244 import_process_env.put("HADOOP_PREFIX", hadoop_prefix);
245 fw1.write("[HADOOP_PREFIX: " + hadoop_prefix + "]\n");
246
247 // - change working directory
248 import_process_builder.directory(new File(gsdlhome));
249 // - close our output to the log before opening in the process
250 fw1.close();
251
252 // - write output to log
253 import_process_builder.redirectErrorStream(true);
254 import_process_builder.redirectOutput(Redirect.appendTo(import_process_log));
255
256 // - create progress reporter (so Hadoop doesn't time us out)
257 Thread reporter = new HadoopProgressReporter(context, import_process_log);
258 reporter.start();
259
260 // - run process
261 Process import_process = import_process_builder.start();
262 try
263 {
264 int import_status = import_process.waitFor();
265 if (import_status != 0)
266 {
267 throw new Exception("exit status: " + import_status);
268 }
269 }
270 catch (Exception e)
271 {
272 System.err.println("Error! Import command failed (" + e.toString() + ")");
273 }
274
275 // - stop the progress reporter as, one way or another, there will be no
276 // more progress
277 reporter.interrupt();
278 reporter = null; // force gc
279
280 // - write end time to log
281 FileWriter fw2 = new FileWriter(import_process_log, true);
282 long end_time = System.currentTimeMillis()/1000;
283 fw2.write("[Completed:" + end_time + "]\n");
284 fw2.close();
285
286 // - for now return a dummy output. In the future I may want to parse the
287 // output from Greenstone as output and allow reducing to make me a
288 // pretty timebased log
289 context.write(key, value);
290 }
291 /** map(LongWritable,Text,Context) **/
292
293 }
294 /** class GSMap **/
295
296 /** @function main
297 */
298 public static void main(String[] args)
299 throws Exception
300 {
301 if (args.length < 6)
302 {
303 System.out.println("Usage: bin/hadoop jar hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest <gsdlhome> <hdfsprefix> <hadoop prefix> <collection> <hdfsin> <hdfsout>\n");
304 System.exit(0);
305 }
306
307 Configuration conf = new Configuration();
308 conf.set("gsdlhome", args[0]);
309 conf.set("hdfsprefix", args[1]); // HDThriftFS or HDFSShell
310 conf.set("hadoopprefix", args[2]);
311 conf.set("collection", args[3]);
312 // Set the number of retries to 1 - hopefully one of the following will work
313 conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop
314 conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha
315 conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web
316 // prevent timeouts
317 long milli_seconds = 60*60*1000; // 1 hour
318 conf.setLong("mapred.task.timeout", milli_seconds);
319 Job job = new Job(conf, "hadoopgreenstoneingest");
320 job.setJarByClass(HadoopGreenstoneIngest.class);
321
322 job.setOutputKeyClass(Text.class);
323 job.setOutputValueClass(IntWritable.class);
324
325 // Register the map, combiner, and reducer classes
326 job.setMapperClass(GSMap.class);
327 // - in theory, uses the IdentityReducer by default, which simply returns
328 // the input as the output (so no processing)
329 job.setNumReduceTasks(0);
330
331 // Sets the input and output handlers - may need to adjust input to provide me
332 // a series of filenames (TextInputFormat will instead read in a text file and
333 // return each line...)
334 job.setInputFormatClass(GSFileInputFormat.class);
335 job.setOutputFormatClass(NullOutputFormat.class);
336 //job.setOutputFormatClass(TextOutputFormat.class);
337
338 // Register the input and output paths
339 // - this input path should be to a file (in HDFS) that lists the paths to
340 // the manifest files
341 FileInputFormat.setInputPaths(job, new Path(args[4]));
342 // - for now the output isn't that important, but in the future I may use
343 // this mechanism to produce a time based log.
344 FileOutputFormat.setOutputPath(job, new Path(args[5]));
345
346 // Recommended notation despite my hatiness of ?: syntax
347 System.exit(job.waitForCompletion(true)?0:1);
348 }
349 /** main(String[]) **/
350
351 /** @function runCommand()
352 *
353 * A convenience method that calls an external command and returns its
354 * standard out as a string. Warning! Not safe if the command could return a
355 * large amount of text in the STDERR stream - may infinitely block.
356 *
357 */
358 public static String runCommand(String command)
359 {
360 StringBuffer result = new StringBuffer();
361 try
362 {
363 Runtime run = Runtime.getRuntime() ;
364 Process pr = run.exec(command) ;
365 pr.waitFor() ;
366 BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
367 String line;
368 while ( ( line = buf.readLine() ) != null )
369 {
370 result.append(line);
371 }
372 }
373 catch (Exception ex)
374 {
375 System.err.println("Error! " + ex.getMessage());
376 }
377 return result.toString();
378 }
379 /** runCommand() **/
380}
381
382class HadoopProgressReporter
383extends Thread
384{
385
386 private Context hadoop_process;
387
388 private File log_file;
389
390 HadoopProgressReporter(Context hadoop_process, File log_file)
391 {
392 this.hadoop_process = hadoop_process;
393 this.log_file = log_file;
394 }
395
396 public void run()
397 {
398 try
399 {
400 while (!this.isInterrupted())
401 {
402 sleep(60000); // Wait a minute
403 //FileWriter fw1 = new FileWriter(this.log_file, true);
404 //long time = System.currentTimeMillis()/1000;
405 //fw1.write("[" + time + "] HadoopProgressReporter.progress()\n");
406 //fw1.close();
407 this.hadoop_process.progress(); // Inform Hadoop we are still processing
408 }
409 }
410 catch (InterruptedException iex)
411 {
412 // We've been interrupted: no more progress
413 }
414 catch (Exception ex)
415 {
416 ex.printStackTrace();
417 }
418 }
419}
Note: See TracBrowser for help on using the repository browser.