- Timestamp:
- 2017-05-01T20:07:17+12:00 (7 years ago)
- Location:
- main/trunk/gli/src/org/greenstone/gatherer
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
main/trunk/gli/src/org/greenstone/gatherer/shell/GShell.java
r31638 r31639 152 152 } */ 153 153 154 // This method is only used by old_runLocal(). If that is removed, then remove this too 154 155 protected StringBuffer get_stream_char(InputStreamReader isr, StringBuffer line_buffer, 155 156 BufferedOutputStream bos) throws IOException … … 224 225 // old_runLocal uses a potentially unsafe way of running a process and an inefficient way of reading 225 226 // from the process stdout and stderr streams. Replaced by new runLocal() which uses SafeProcess and 226 // the new CustomProcessHandler inner class instantiated for each of the 2 process output streams. 227 // the new LineByLineHandler inner class instantiated for each of the 2 process output streams. 228 // If this method is ever removed from the file, can remove its helper get_stream_char() too. 227 229 private void old_runLocal(String[] args, BufferedOutputStream bos) 228 230 { … … 354 356 355 357 prcs = new SafeProcess(args); 356 SafeProcess. CustomProcessHandler processOutHandler357 = new Synchronized ProcessHandler(bos, SafeProcess.STDOUT);358 SafeProcess. CustomProcessHandler processErrHandler359 = new Synchronized ProcessHandler(bos, SafeProcess.STDERR);358 SafeProcess.LineByLineHandler processOutLineHandler 359 = new SynchronizedLineByLineHandler(bos, SafeProcess.STDOUT); 360 SafeProcess.LineByLineHandler processErrLineHandler 361 = new SynchronizedLineByLineHandler(bos, SafeProcess.STDERR); 360 362 361 363 prcs.setExceptionHandler(this); 362 int exitValue = prcs.runProcess( null, processOutHandler, processErrHandler); // use default procIn handling364 int exitValue = prcs.runProcess(processOutLineHandler, processErrLineHandler); // use default procIn handling 363 365 364 366 if(exitValue == 0) { 365 System.err.println("*** Exited normally");367 //System.err.println("*** Exitted normally"); 366 368 status = OK; 367 369 fireMessage(type, typeAsString(type) + "> " + Dictionary.get("GShell.Success"), status, null); 368 370 } 369 371 else { 370 System.err.println("*** Exited abnormally with exit value " + exitValue);372 //System.err.println("*** Exitted abnormally with exit value " + exitValue); 371 373 status = ERROR; 372 374 fireMessage(type, typeAsString(type) + "> " + Dictionary.get("GShell.Failure"), status, null); … … 385 387 386 388 this.interrupt(); // interrupt this thread which is running the SafeProcess prcs 387 // this will propagate the CANCEL status 389 // this will propagate the CANCEL status to any worker threads launched by the SafeProcess 388 390 prcs = null; 389 391 } 390 hasSignalledStop(); // synchronized. Will set status to CANCELLED in thread-safe manner.392 setStatus(CANCELLED); //or: hasSignalledStop(); // synchronized. Either will set status to CANCELLED in thread-safe manner. 391 393 } 392 394 … … 691 693 692 694 if(e instanceof InterruptedException) { 693 SafeProcess.log("*** InterruptedException in GShell.runLocal()"); 695 DebugStream.println("We've been asked to stop."); 696 SafeProcess.log("@@@ Interruption to SafeProcess run by GShell.runLocal()"); 694 697 setStatus(CANCELLED); // expected exception 695 698 } else { … … 706 709 // Each instance of this class is run in its own thread by class SafeProcess.InputGobbler. 707 710 // This class deals with each incoming line from the perl process' stderr or stdout streams. One 708 // instance of this class for each stream. However, since multiple instances of this CustomProcessHandler 709 // could be (and in fact, are) writing to the same file in their own threads, several objects, not just 710 // the bufferedwriter object, needed to be made threadsafe. 711 // This class also handles exceptions during the running of the perl process. 712 // The runPerlCommand code originally would do a sendProcessStatus on each exception, so we ensure 711 // instance of this class for each stream. However, since multiple instances of this LineByLineHandler 712 // are firing off events to the same outputstream object (bos) in their own threads, several objects, 713 // not just bost, needed to be made threadsafe. So gotLine needs to be synchronized, and created a 714 // synchronized setStatus() method above too. 715 // This class also handles exceptions that may occur when reading from the perl process' stderr or stout. 716 // The old GShell.runLocal() code would set the GShell.status to ERROR on each exception, so we ensure 713 717 // we do that here too, to continue original behaviour. These calls are also synchronized to make their 714 718 // use of the EventListeners threadsafe. 715 protected class Synchronized ProcessHandler extends SafeProcess.CustomProcessHandler719 protected class SynchronizedLineByLineHandler extends SafeProcess.LineByLineHandler 716 720 { 717 721 private final BufferedOutputStream bos; // needs to be final to be able to synchronize on the shared object 718 722 719 public Synchronized ProcessHandler(BufferedOutputStream bos, int src) {723 public SynchronizedLineByLineHandler(BufferedOutputStream bos, int src) { 720 724 super(src); // will set this.source to STDERR or STDOUT 721 725 this.bos = bos; // caller will close bw, since many more than one 722 // Synchronized ProcessHandlers are using it726 // SynchronizedLineByLineHandlers are using it 723 727 } 724 728 725 729 // trying to keep synchronized methods as short as possible 726 private void logException(Exception e) { 727 String stream = (this.source == SafeProcess.STDERR) ? "stderr" : "stdout"; 728 String msg = "Got exception when processing the perl process' " + stream + " stream."; 730 public void gotException(Exception e) { 731 String msg = "Got exception when processing the perl process' " + SafeProcess.streamToString(this.source) + " stream."; 729 732 730 733 DebugStream.println(msg); // method is already synchronized … … 732 735 SafeProcess.log("*** " + msg, e); 733 736 734 GShell.this.setStatus(GShell.ERROR); 737 GShell.this.setStatus(GShell.ERROR); // synchronized 735 738 } 736 739 … … 740 743 } 741 744 742 public void run(Closeable inputStream) { 743 InputStream is = (InputStream) inputStream; 744 745 BufferedReader br = null; 746 try { 747 748 br = new BufferedReader(new InputStreamReader(is, "UTF-8")); 749 String line=null; 750 751 ///while ( !GShell.this.prcs.processRunning() || hasSignalledStop()) { // need to sync on prcs 752 while ( !Thread.currentThread().isInterrupted() && (line = br.readLine()) != null ) { 753 this.gotLine(line); // synchronized 754 } 755 756 } catch (Exception e) { 757 logException(e); 758 } finally { 759 System.err.println("*********"); 760 if(Thread.currentThread().isInterrupted()) { 761 log("We've been asked to stop."); 762 763 } 764 SafeProcess.closeResource(br); 765 String stream = (this.source == SafeProcess.STDERR) ? "stderr" : "stdout"; 766 System.err.println("*** In finally of " + stream + " stream"); 767 System.err.println("*********"); 768 } 769 } 770 771 protected synchronized void gotLine(String line) { 745 // every time we read a line from the SafeProcess' stderr or stdout stream we come here. 746 // a different isntance of SynchronizedL 747 public synchronized void gotLine(String line) { 772 748 fireMessage(GShell.this.type, // not final, needs synchro 773 749 GShell.this.typeAsString(type) + "> " + line, -
main/trunk/gli/src/org/greenstone/gatherer/util/SafeProcess.java
r31638 r31639 21 21 22 22 public class SafeProcess { 23 //public static int DEBUG = 0; 23 24 24 25 public static final int STDERR = 0; … … 99 100 public void setSplitStdErrorNewLines(boolean split) { 100 101 splitStdErrorNewLines = split; 101 }102 103 // logger and DebugStream print commands are synchronized, therefore thread safe.104 public static void log(String msg) {105 //logger.info(msg);106 107 System.err.println(msg);108 109 //DebugStream.println(msg);110 }111 112 public static void log(String msg, Exception e) { // Print stack trace on the exception113 //logger.error(msg, e);114 115 System.err.println(msg);116 e.printStackTrace();117 118 //DebugStream.println(msg);119 //DebugStream.printStackTrace(e);120 }121 122 public static void log(Exception e) {123 //logger.error(e);124 125 e.printStackTrace();126 127 //DebugStream.printStackTrace(e);128 }129 130 public static void log(String msg, Exception e, boolean printStackTrace) {131 if(printStackTrace) {132 log(msg, e);133 } else {134 log(msg);135 }136 102 } 137 103 … … 184 150 this.exitValue = process.waitFor(); // can throw an InterruptedException if process did not terminate 185 151 } catch(InterruptedException ie) { 186 187 152 log("*** Process interrupted (InterruptedException). Expected to be a Cancel operation."); 188 153 // don't print stacktrace: an interrupt here is not an error, it's expected to be a cancel action 154 if(exceptionHandler != null) { 155 exceptionHandler.gotException(ie); 156 } 189 157 190 158 // propagate interrupts to worker threads here … … 204 172 // to swallow the interrupt this time and not let it propagate by commenting out the next line? 205 173 //Thread.currentThread().interrupt(); // re-interrupt the thread 206 174 207 175 inputGobbler.interrupt(); 208 176 errorGobbler.interrupt(); 209 177 outputGobbler.interrupt(); 210 178 179 // even after the interrupts, we want to proceed to calling join() on all the worker threads 180 // in order to wait for each of them to die before attempting to destroy the process if it 181 // still hasn't terminated after all that. 211 182 } finally { 212 183 … … 358 329 359 330 } finally { 360 361 log("*** In finally of SafeProcess.runProcess(3 params)");331 //String cmd = (this.command == null) ? Arrays.toString(this.command_args) : this.command; 332 //log("*** In finally of SafeProcess.runProcess(3 params): " + cmd); 362 333 363 334 if( process != null ) { … … 440 411 // http://www.javaworld.com/article/2071275/core-java/when-runtime-exec---won-t.html?page=2 441 412 // http://mark.koli.ch/leaky-pipes-remember-to-close-your-streams-when-using-javas-runtimegetruntimeexec 413 414 //String cmd = (this.command == null) ? Arrays.toString(this.command_args) : this.command; 415 //log("*** In finally of SafeProcess.runProcess(2 params): " + cmd); 416 442 417 if( process != null ) { 443 418 log("*** Going to call process.destroy 1"); … … 446 421 log("*** Have called process.destroy 1"); 447 422 } 448 log("*** In finally of SafeProcess.runProcess(2 params)");449 423 } 450 424 … … 484 458 protected CustomProcessHandler(int src) { 485 459 this.source = src; // STDERR or STDOUT or STDIN 486 487 // modify threadname to prefix stream src (stdin/stderr/stdout) 488 // Useful for debugging if thread is named clearly 489 String stream; 490 switch(src) { 491 case SafeProcess.STDERR: 492 stream = "stderr"; 493 case SafeProcess.STDOUT: 494 stream = "stdout"; 495 default: 496 stream = "stdin"; 497 } 498 Thread.currentThread().setName(stream + Thread.currentThread().getName()); 460 } 461 462 public String getThreadNamePrefix() { 463 return SafeProcess.streamToString(this.source); 499 464 } 500 465 … … 510 475 protected LineByLineHandler(int src) { 511 476 this.source = src; // STDERR or STDOUT 477 } 478 479 public String getThreadNamePrefix() { 480 return SafeProcess.streamToString(this.source); 512 481 } 513 482 … … 530 499 private LineByLineHandler lineByLineHandler = null; 531 500 501 protected InputStreamGobbler() { 502 super("InputStreamGobbler"); 503 } 504 532 505 public InputStreamGobbler(InputStream is) 533 506 { 534 super("InputStreamGobbler"); //thread name507 this(); // sets thread name 535 508 this.is = is; 536 509 this.split_newlines = false; … … 539 512 public InputStreamGobbler(InputStream is, boolean split_newlines) 540 513 { 541 super("InputStreamGobbler"); //thread name514 this(); // sets thread name 542 515 this.is = is; 543 516 this.split_newlines = split_newlines; … … 546 519 public InputStreamGobbler(InputStream is, CustomProcessHandler customHandler) 547 520 { 548 super("InputStreamGobbler"); // thread name521 this(); // thread name 549 522 this.is = is; 550 523 this.customHandler = customHandler; 524 this.adjustThreadName(customHandler.getThreadNamePrefix()); 525 } 526 527 528 private void adjustThreadName(String prefix) { 529 this.setName(prefix + this.getName()); 551 530 } 552 531 553 532 public void setLineByLineHandler(LineByLineHandler lblHandler) { 554 533 this.lineByLineHandler = lblHandler; 534 this.adjustThreadName(lblHandler.getThreadNamePrefix()); 555 535 } 556 536 … … 579 559 lineByLineHandler.gotException(ioe); 580 560 } else { 581 log("Exception when reading from a process' stdout/stderr stream: ", ioe);561 log("Exception when reading process stream with " + this.getName() + ": ", ioe); 582 562 } 583 563 } finally { 584 log("*********");585 564 if(this.isInterrupted()) { 586 log(" We've been asked to stop.");565 log("@@@ Successfully interrupted " + this.getName() + "."); 587 566 } 588 567 SafeProcess.closeResource(br); 589 log("*** In finally of " + this.getName());590 log("*********");591 568 } 592 569 } … … 619 596 private CustomProcessHandler customHandler = null; 620 597 598 protected OutputStreamGobbler() { 599 super("stdinOutputStreamGobbler"); // thread name 600 } 601 621 602 public OutputStreamGobbler(OutputStream os) { 622 super("OutputStreamGobbler"); //thread name603 this(); // set thread name 623 604 this.os = os; 624 605 } … … 626 607 public OutputStreamGobbler(OutputStream os, String inputstr) 627 608 { 628 super("OutputStreamGobbler"); //thread name609 this(); // set thread name 629 610 this.os = os; 630 611 this.inputstr = inputstr; … … 632 613 633 614 public OutputStreamGobbler(OutputStream os, CustomProcessHandler customHandler) { 634 super("OutputStreamGobbler"); //thread name615 this(); // set thread name 635 616 this.os = os; 636 617 this.customHandler = customHandler; … … 692 673 } // end static inner class OutputStreamGobbler 693 674 675 //**************** Static methods **************// 676 677 678 // logger and DebugStream print commands are synchronized, therefore thread safe. 679 public static void log(String msg) { 680 //logger.info(msg); 681 682 System.err.println(msg); 683 684 //DebugStream.println(msg); 685 } 686 687 public static void log(String msg, Exception e) { // Print stack trace on the exception 688 //logger.error(msg, e); 689 690 System.err.println(msg); 691 e.printStackTrace(); 692 693 //DebugStream.println(msg); 694 //DebugStream.printStackTrace(e); 695 } 696 697 public static void log(Exception e) { 698 //logger.error(e); 699 700 e.printStackTrace(); 701 702 //DebugStream.printStackTrace(e); 703 } 704 705 public static void log(String msg, Exception e, boolean printStackTrace) { 706 if(printStackTrace) { 707 log(msg, e); 708 } else { 709 log(msg); 710 } 711 } 712 713 public static String streamToString(int src) { 714 String stream; 715 switch(src) { 716 case STDERR: 717 stream = "stderr"; 718 break; 719 case STDOUT: 720 stream = "stdout"; 721 break; 722 default: 723 stream = "stdin"; 724 } 725 return stream; 726 } 694 727 695 728 //**************** Useful static methods. Copied from GLI's Utility.java ******************
Note:
See TracChangeset
for help on using the changeset viewer.