Changeset 31639

Show
Ignore:
Timestamp:
01.05.2017 20:07:17 (2 years ago)
Author:
ak19
Message:

1. Correcting switch statement in SafeProcess? and tidying up some debugging. 2. GShell needs to have its own LineByLineHandlers? not CustomProcessHandlers?, since most of the err/out stream processing code should be the default provided by SafeProcess?. 3. GShell.gotException wasn't being called during an expected InterruptedException? on Cancel, so corrected that to make the code write a message to the DebugStream? as the original code did when a build operation had been cancelled.

Location:
main/trunk/gli/src/org/greenstone/gatherer
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • main/trunk/gli/src/org/greenstone/gatherer/shell/GShell.java

    r31638 r31639  
    152152       } */ 
    153153 
     154    // This method is only used by old_runLocal(). If that is removed, then remove this too 
    154155    protected StringBuffer get_stream_char(InputStreamReader isr, StringBuffer line_buffer, 
    155156                   BufferedOutputStream bos) throws IOException  
     
    224225    // old_runLocal uses a potentially unsafe way of running a process and an inefficient way of reading 
    225226    // from the process stdout and stderr streams. Replaced by new runLocal() which uses SafeProcess and 
    226     // the new CustomProcessHandler inner class instantiated for each of the 2 process output streams. 
     227    // the new LineByLineHandler inner class instantiated for each of the 2 process output streams. 
     228    // If this method is ever removed from the file, can remove its helper get_stream_char() too. 
    227229    private void old_runLocal(String[] args, BufferedOutputStream bos)  
    228230    { 
     
    354356     
    355357    prcs = new SafeProcess(args); 
    356     SafeProcess.CustomProcessHandler processOutHandler 
    357         = new SynchronizedProcessHandler(bos, SafeProcess.STDOUT); 
    358     SafeProcess.CustomProcessHandler processErrHandler 
    359         = new SynchronizedProcessHandler(bos, SafeProcess.STDERR); 
     358    SafeProcess.LineByLineHandler processOutLineHandler 
     359        = new SynchronizedLineByLineHandler(bos, SafeProcess.STDOUT); 
     360    SafeProcess.LineByLineHandler processErrLineHandler 
     361        = new SynchronizedLineByLineHandler(bos, SafeProcess.STDERR); 
    360362     
    361363    prcs.setExceptionHandler(this);  
    362     int exitValue = prcs.runProcess(null, processOutHandler, processErrHandler); // use default procIn handling 
     364    int exitValue = prcs.runProcess(processOutLineHandler, processErrLineHandler); // use default procIn handling 
    363365     
    364366    if(exitValue == 0) { 
    365         System.err.println("*** Exited normally"); 
     367        //System.err.println("*** Exitted normally"); 
    366368        status = OK; 
    367369        fireMessage(type, typeAsString(type) + "> " + Dictionary.get("GShell.Success"), status, null); 
    368370    } 
    369371    else { 
    370         System.err.println("*** Exited abnormally with exit value " + exitValue); 
     372        //System.err.println("*** Exitted abnormally with exit value " + exitValue); 
    371373        status = ERROR; 
    372374        fireMessage(type, typeAsString(type) + "> " + Dictionary.get("GShell.Failure"), status, null); 
     
    385387 
    386388        this.interrupt(); // interrupt this thread which is running the SafeProcess prcs 
    387         // this will propagate the CANCEL status 
     389        // this will propagate the CANCEL status to any worker threads launched by the SafeProcess 
    388390        prcs = null;         
    389391    } 
    390     hasSignalledStop(); // synchronized. Will set status to CANCELLED in thread-safe manner.     
     392    setStatus(CANCELLED); //or: hasSignalledStop(); // synchronized. Either will set status to CANCELLED in thread-safe manner.  
    391393    } 
    392394     
     
    691693 
    692694    if(e instanceof InterruptedException) { 
    693         SafeProcess.log("*** InterruptedException in GShell.runLocal()"); 
     695        DebugStream.println("We've been asked to stop."); 
     696        SafeProcess.log("@@@ Interruption to SafeProcess run by GShell.runLocal()"); 
    694697        setStatus(CANCELLED); // expected exception 
    695698    } else { 
     
    706709    // Each instance of this class is run in its own thread by class SafeProcess.InputGobbler. 
    707710    // This class deals with each incoming line from the perl process' stderr or stdout streams. One 
    708     // instance of this class for each stream. However, since multiple instances of this CustomProcessHandler 
    709     // could be (and in fact, are) writing to the same file in their own threads, several objects, not just 
    710     // the bufferedwriter object, needed to be made threadsafe. 
    711     // This class also handles exceptions during the running of the perl process. 
    712     // The runPerlCommand code originally would do a sendProcessStatus on each exception, so we ensure 
     711    // instance of this class for each stream. However, since multiple instances of this LineByLineHandler 
     712    // are firing off events to the same outputstream object (bos) in their own threads, several objects, 
     713    // not just bost, needed to be made threadsafe. So gotLine needs to be synchronized, and created a 
     714    // synchronized setStatus() method above too. 
     715    // This class also handles exceptions that may occur when reading from the perl process' stderr or stout. 
     716    // The old GShell.runLocal() code would set the GShell.status to ERROR on each exception, so we ensure 
    713717    // we do that here too, to continue original behaviour. These calls are also synchronized to make their 
    714718    // use of the EventListeners threadsafe. 
    715     protected class SynchronizedProcessHandler extends SafeProcess.CustomProcessHandler 
     719    protected class SynchronizedLineByLineHandler extends SafeProcess.LineByLineHandler 
    716720    { 
    717721    private final BufferedOutputStream bos; // needs to be final to be able to synchronize on the shared object 
    718722     
    719     public SynchronizedProcessHandler(BufferedOutputStream bos, int src) { 
     723    public SynchronizedLineByLineHandler(BufferedOutputStream bos, int src) { 
    720724        super(src); // will set this.source to STDERR or STDOUT 
    721725        this.bos = bos; // caller will close bw, since many more than one 
    722                             // SynchronizedProcessHandlers are using it 
     726                            // SynchronizedLineByLineHandlers are using it 
    723727    } 
    724728 
    725729    // trying to keep synchronized methods as short as possible 
    726     private void logException(Exception e) { 
    727         String stream = (this.source == SafeProcess.STDERR) ? "stderr" : "stdout"; 
    728         String msg = "Got exception when processing the perl process' " + stream + " stream."; 
     730    public void gotException(Exception e) { 
     731        String msg = "Got exception when processing the perl process' " + SafeProcess.streamToString(this.source) + " stream."; 
    729732 
    730733        DebugStream.println(msg); // method is already synchronized 
     
    732735        SafeProcess.log("*** " + msg, e); 
    733736 
    734         GShell.this.setStatus(GShell.ERROR); 
     737        GShell.this.setStatus(GShell.ERROR); // synchronized 
    735738    } 
    736739 
     
    740743    } 
    741744 
    742     public void run(Closeable inputStream) { 
    743         InputStream is = (InputStream) inputStream; 
    744  
    745         BufferedReader br = null; 
    746         try {        
    747  
    748         br = new BufferedReader(new InputStreamReader(is, "UTF-8")); 
    749         String line=null; 
    750  
    751         ///while ( !GShell.this.prcs.processRunning() || hasSignalledStop()) { // need to sync on prcs 
    752         while ( !Thread.currentThread().isInterrupted() && (line = br.readLine()) != null ) { 
    753             this.gotLine(line); // synchronized          
    754         }        
    755  
    756         } catch (Exception e) { 
    757         logException(e); 
    758         } finally { 
    759         System.err.println("*********"); 
    760         if(Thread.currentThread().isInterrupted()) { 
    761             log("We've been asked to stop."); 
    762              
    763         } 
    764         SafeProcess.closeResource(br); 
    765         String stream = (this.source == SafeProcess.STDERR) ? "stderr" : "stdout"; 
    766         System.err.println("*** In finally of " + stream + " stream"); 
    767         System.err.println("*********"); 
    768         } 
    769     } 
    770  
    771     protected synchronized void gotLine(String line) { 
     745    // every time we read a line from the SafeProcess' stderr or stdout stream we come here. 
     746    // a different isntance of SynchronizedL 
     747    public synchronized void gotLine(String line) { 
    772748        fireMessage(GShell.this.type,   // not final, needs synchro 
    773749            GShell.this.typeAsString(type) + "> " + line, 
  • main/trunk/gli/src/org/greenstone/gatherer/util/SafeProcess.java

    r31638 r31639  
    2121 
    2222public class SafeProcess { 
     23    //public static int DEBUG = 0; 
    2324 
    2425    public static final int STDERR = 0; 
     
    99100    public void setSplitStdErrorNewLines(boolean split) { 
    100101    splitStdErrorNewLines = split; 
    101     } 
    102  
    103     // logger and DebugStream print commands are synchronized, therefore thread safe. 
    104     public static void log(String msg) { 
    105     //logger.info(msg); 
    106  
    107     System.err.println(msg); 
    108  
    109     //DebugStream.println(msg); 
    110     } 
    111  
    112     public static void log(String msg, Exception e) { // Print stack trace on the exception 
    113     //logger.error(msg, e); 
    114  
    115     System.err.println(msg); 
    116     e.printStackTrace(); 
    117  
    118     //DebugStream.println(msg); 
    119     //DebugStream.printStackTrace(e); 
    120     } 
    121  
    122     public static void log(Exception e) { 
    123     //logger.error(e); 
    124  
    125     e.printStackTrace(); 
    126  
    127     //DebugStream.printStackTrace(e); 
    128     } 
    129  
    130     public static void log(String msg, Exception e, boolean printStackTrace) { 
    131     if(printStackTrace) {  
    132         log(msg, e); 
    133     } else { 
    134         log(msg); 
    135     } 
    136102    } 
    137103 
     
    184150        this.exitValue = process.waitFor(); // can throw an InterruptedException if process did not terminate 
    185151    } catch(InterruptedException ie) { 
    186          
    187152        log("*** Process interrupted (InterruptedException). Expected to be a Cancel operation."); 
    188153            // don't print stacktrace: an interrupt here is not an error, it's expected to be a cancel action 
     154        if(exceptionHandler != null) {       
     155        exceptionHandler.gotException(ie); 
     156        }            
    189157         
    190158        // propagate interrupts to worker threads here 
     
    204172        // to swallow the interrupt this time and not let it propagate by commenting out the next line? 
    205173        //Thread.currentThread().interrupt(); // re-interrupt the thread 
    206          
     174 
    207175        inputGobbler.interrupt(); 
    208176        errorGobbler.interrupt(); 
    209177        outputGobbler.interrupt(); 
    210          
     178 
     179        // even after the interrupts, we want to proceed to calling join() on all the worker threads 
     180        // in order to wait for each of them to die before attempting to destroy the process if it 
     181        // still hasn't terminated after all that. 
    211182    } finally {      
    212183         
     
    358329     
    359330    } finally {  
    360  
    361         log("*** In finally of SafeProcess.runProcess(3 params)"); 
     331        //String cmd = (this.command == null) ? Arrays.toString(this.command_args) : this.command; 
     332        //log("*** In finally of SafeProcess.runProcess(3 params): " + cmd); 
    362333 
    363334        if( process != null ) { 
     
    440411        // http://www.javaworld.com/article/2071275/core-java/when-runtime-exec---won-t.html?page=2 
    441412        // http://mark.koli.ch/leaky-pipes-remember-to-close-your-streams-when-using-javas-runtimegetruntimeexec         
     413         
     414        //String cmd = (this.command == null) ? Arrays.toString(this.command_args) : this.command; 
     415        //log("*** In finally of SafeProcess.runProcess(2 params): " + cmd); 
     416 
    442417        if( process != null ) { 
    443418        log("*** Going to call process.destroy 1"); 
     
    446421        log("*** Have called process.destroy 1"); 
    447422        } 
    448         log("*** In finally of SafeProcess.runProcess(2 params)"); 
    449423    } 
    450424     
     
    484458    protected CustomProcessHandler(int src) { 
    485459    this.source = src; // STDERR or STDOUT or STDIN 
    486  
    487     // modify threadname to prefix stream src (stdin/stderr/stdout) 
    488     // Useful for debugging if thread is named clearly 
    489     String stream; 
    490     switch(src) { 
    491     case SafeProcess.STDERR: 
    492        stream = "stderr";  
    493     case SafeProcess.STDOUT: 
    494         stream = "stdout"; 
    495     default: 
    496         stream = "stdin"; 
    497     } 
    498     Thread.currentThread().setName(stream + Thread.currentThread().getName()); 
     460    } 
     461 
     462    public String getThreadNamePrefix() { 
     463    return SafeProcess.streamToString(this.source);  
    499464    } 
    500465     
     
    510475    protected LineByLineHandler(int src) { 
    511476    this.source = src; // STDERR or STDOUT 
     477    } 
     478 
     479    public String getThreadNamePrefix() { 
     480    return SafeProcess.streamToString(this.source);  
    512481    } 
    513482 
     
    530499    private LineByLineHandler lineByLineHandler = null; 
    531500 
     501    protected InputStreamGobbler() { 
     502    super("InputStreamGobbler"); 
     503    } 
     504 
    532505    public InputStreamGobbler(InputStream is) 
    533506    { 
    534     super("InputStreamGobbler"); // thread name 
     507    this(); // sets thread name 
    535508    this.is = is; 
    536509    this.split_newlines = false; 
     
    539512    public InputStreamGobbler(InputStream is, boolean split_newlines) 
    540513    { 
    541     super("InputStreamGobbler"); // thread name 
     514    this(); // sets thread name 
    542515    this.is = is; 
    543516    this.split_newlines = split_newlines; 
     
    546519    public InputStreamGobbler(InputStream is, CustomProcessHandler customHandler) 
    547520    { 
    548     super("InputStreamGobbler"); // thread name 
     521    this(); // thread name 
    549522    this.is = is; 
    550523    this.customHandler = customHandler; 
     524    this.adjustThreadName(customHandler.getThreadNamePrefix()); 
     525    } 
     526 
     527     
     528    private void adjustThreadName(String prefix) {   
     529    this.setName(prefix + this.getName()); 
    551530    } 
    552531 
    553532    public void setLineByLineHandler(LineByLineHandler lblHandler) { 
    554533    this.lineByLineHandler = lblHandler; 
     534    this.adjustThreadName(lblHandler.getThreadNamePrefix()); 
    555535    } 
    556536 
     
    579559        lineByLineHandler.gotException(ioe); 
    580560        } else { 
    581         log("Exception when reading from a process' stdout/stderr stream: ", ioe); 
     561        log("Exception when reading process stream with " + this.getName() + ": ", ioe); 
    582562        } 
    583563    } finally { 
    584         log("*********"); 
    585564        if(this.isInterrupted()) { 
    586         log("We've been asked to stop.");        
     565        log("@@@ Successfully interrupted " + this.getName() + "."); 
    587566        } 
    588567        SafeProcess.closeResource(br); 
    589         log("*** In finally of " + this.getName()); 
    590         log("*********"); 
    591568    } 
    592569    } 
     
    619596    private CustomProcessHandler customHandler = null; 
    620597 
     598    protected OutputStreamGobbler() { 
     599    super("stdinOutputStreamGobbler"); // thread name 
     600    } 
     601 
    621602    public OutputStreamGobbler(OutputStream os) { 
    622     super("OutputStreamGobbler"); // thread name 
     603    this(); // set thread name 
    623604    this.os = os; 
    624605    } 
     
    626607    public OutputStreamGobbler(OutputStream os, String inputstr) 
    627608    { 
    628     super("OutputStreamGobbler"); // thread name 
     609    this(); // set thread name 
    629610    this.os = os; 
    630611    this.inputstr = inputstr; 
     
    632613 
    633614    public OutputStreamGobbler(OutputStream os, CustomProcessHandler customHandler) { 
    634     super("OutputStreamGobbler"); // thread name 
     615    this(); // set thread name 
    635616    this.os = os; 
    636617    this.customHandler = customHandler; 
     
    692673} // end static inner class OutputStreamGobbler 
    693674 
     675//**************** Static methods **************// 
     676 
     677 
     678    // logger and DebugStream print commands are synchronized, therefore thread safe. 
     679    public static void log(String msg) { 
     680    //logger.info(msg); 
     681 
     682    System.err.println(msg); 
     683 
     684    //DebugStream.println(msg); 
     685    } 
     686 
     687    public static void log(String msg, Exception e) { // Print stack trace on the exception 
     688    //logger.error(msg, e); 
     689 
     690    System.err.println(msg); 
     691    e.printStackTrace(); 
     692 
     693    //DebugStream.println(msg); 
     694    //DebugStream.printStackTrace(e); 
     695    } 
     696 
     697    public static void log(Exception e) { 
     698    //logger.error(e); 
     699 
     700    e.printStackTrace(); 
     701 
     702    //DebugStream.printStackTrace(e); 
     703    } 
     704 
     705    public static void log(String msg, Exception e, boolean printStackTrace) { 
     706    if(printStackTrace) {  
     707        log(msg, e); 
     708    } else { 
     709        log(msg); 
     710    } 
     711    } 
     712 
     713    public static String streamToString(int src) { 
     714    String stream; 
     715    switch(src) { 
     716    case STDERR: 
     717        stream = "stderr";  
     718        break; 
     719    case STDOUT: 
     720        stream = "stdout"; 
     721        break; 
     722    default: 
     723        stream = "stdin"; 
     724    } 
     725    return stream; 
     726    } 
    694727 
    695728//**************** Useful static methods. Copied from GLI's Utility.java ******************