Changeset 27102
- Timestamp:
- 2013-03-20T13:00:34+13:00 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest.java
r26982 r27102 13 13 import java.lang.ProcessBuilder; 14 14 import java.lang.ProcessBuilder.*; 15 import java.lang.Thread; 15 16 import java.net.InetAddress; 16 17 import java.util.Map; … … 20 21 import org.apache.hadoop.io.*; 21 22 import org.apache.hadoop.mapreduce.*; 23 import org.apache.hadoop.mapreduce.Mapper.Context; 22 24 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 23 25 import org.apache.hadoop.mapreduce.lib.input.FileSplit; … … 234 236 import_process_builder.redirectErrorStream(true); 235 237 import_process_builder.redirectOutput(Redirect.appendTo(import_process_log)); 238 239 // - create progress reporter (so Hadoop doesn't time us out) 240 Thread reporter = new HadoopProgressReporter(context, import_process_log); 241 reporter.start(); 242 236 243 // - run process 237 244 Process import_process = import_process_builder.start(); … … 247 254 { 248 255 System.err.println("Error! Import command failed (" + e.toString() + ")"); 249 System.exit(0); 250 } 256 } 257 258 // - stop the progress reporter as, one way or another, there will be no 259 // more progress 260 reporter.interrupt(); 261 reporter = null; // force gc 251 262 252 263 // - write end time to log … … 283 294 conf.set("hadoopprefix", args[3]); 284 295 conf.set("collection", args[4]); 296 // Set the number of retries to 1 - hopefully one of the following will work 297 conf.setInt("mapred.map.max.attempts", 1); // Old Hadoop 298 conf.setInt("mapreduce.map.maxattempts", 1); // Hadoop 2.0.3-alpha 299 conf.setInt("mapreduce.map.max.attempts", 1); // Solution on Web 285 300 // prevent timeouts 286 301 long milli_seconds = 60*60*1000; // 1 hour 287 302 conf.setLong("mapred.task.timeout", milli_seconds); 288 289 303 Job job = new Job(conf, "hadoopgreenstoneingest"); 290 304 job.setJarByClass(HadoopGreenstoneIngest.class); … … 319 333 /** main(String[]) **/ 320 334 } 335 336 class HadoopProgressReporter 337 extends Thread 338 { 339 340 private Context hadoop_process; 341 342 private File log_file; 343 344 HadoopProgressReporter(Context hadoop_process, File log_file) 345 { 346 this.hadoop_process = hadoop_process; 347 this.log_file = log_file; 348 } 349 350 public void run() 351 { 352 try 353 { 354 while (!this.isInterrupted()) 355 { 356 sleep(60000); // Wait a minute 357 //FileWriter fw1 = new FileWriter(this.log_file, true); 358 //long time = System.currentTimeMillis()/1000; 359 //fw1.write("[" + time + "] HadoopProgressReporter.progress()\n"); 360 //fw1.close(); 361 this.hadoop_process.progress(); // Inform Hadoop we are still processing 362 } 363 } 364 catch (InterruptedException iex) 365 { 366 // We've been interrupted: no more progress 367 } 368 catch (Exception ex) 369 { 370 ex.printStackTrace(); 371 } 372 } 373 }
Note:
See TracChangeset
for help on using the changeset viewer.