Changeset 31639


Ignore:
Timestamp:
05/01/17 20:07:17 (4 years ago)
Author:
ak19
Message:
  1. Correcting switch statement in SafeProcess and tidying up some debugging. 2. GShell needs to have its own LineByLineHandlers not CustomProcessHandlers, since most of the err/out stream processing code should be the default provided by SafeProcess. 3. GShell.gotException wasn't being called during an expected InterruptedException on Cancel, so corrected that to make the code write a message to the DebugStream as the original code did when a build operation had been cancelled.
Location:
main/trunk/gli/src/org/greenstone/gatherer
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • main/trunk/gli/src/org/greenstone/gatherer/shell/GShell.java

    r31638 r31639  
    152152       } */
    153153
     154    // This method is only used by old_runLocal(). If that is removed, then remove this too
    154155    protected StringBuffer get_stream_char(InputStreamReader isr, StringBuffer line_buffer,
    155156                   BufferedOutputStream bos) throws IOException
     
    224225    // old_runLocal uses a potentially unsafe way of running a process and an inefficient way of reading
    225226    // from the process stdout and stderr streams. Replaced by new runLocal() which uses SafeProcess and
    226     // the new CustomProcessHandler inner class instantiated for each of the 2 process output streams.
     227    // the new LineByLineHandler inner class instantiated for each of the 2 process output streams.
     228    // If this method is ever removed from the file, can remove its helper get_stream_char() too.
    227229    private void old_runLocal(String[] args, BufferedOutputStream bos)
    228230    {
     
    354356   
    355357    prcs = new SafeProcess(args);
    356     SafeProcess.CustomProcessHandler processOutHandler
    357         = new SynchronizedProcessHandler(bos, SafeProcess.STDOUT);
    358     SafeProcess.CustomProcessHandler processErrHandler
    359         = new SynchronizedProcessHandler(bos, SafeProcess.STDERR);
     358    SafeProcess.LineByLineHandler processOutLineHandler
     359        = new SynchronizedLineByLineHandler(bos, SafeProcess.STDOUT);
     360    SafeProcess.LineByLineHandler processErrLineHandler
     361        = new SynchronizedLineByLineHandler(bos, SafeProcess.STDERR);
    360362   
    361363    prcs.setExceptionHandler(this);
    362     int exitValue = prcs.runProcess(null, processOutHandler, processErrHandler); // use default procIn handling
     364    int exitValue = prcs.runProcess(processOutLineHandler, processErrLineHandler); // use default procIn handling
    363365   
    364366    if(exitValue == 0) {
    365         System.err.println("*** Exited normally");
     367        //System.err.println("*** Exitted normally");
    366368        status = OK;
    367369        fireMessage(type, typeAsString(type) + "> " + Dictionary.get("GShell.Success"), status, null);
    368370    }
    369371    else {
    370         System.err.println("*** Exited abnormally with exit value " + exitValue);
     372        //System.err.println("*** Exitted abnormally with exit value " + exitValue);
    371373        status = ERROR;
    372374        fireMessage(type, typeAsString(type) + "> " + Dictionary.get("GShell.Failure"), status, null);
     
    385387
    386388        this.interrupt(); // interrupt this thread which is running the SafeProcess prcs
    387         // this will propagate the CANCEL status
     389        // this will propagate the CANCEL status to any worker threads launched by the SafeProcess
    388390        prcs = null;       
    389391    }
    390     hasSignalledStop(); // synchronized. Will set status to CANCELLED in thread-safe manner.   
     392    setStatus(CANCELLED); //or: hasSignalledStop(); // synchronized. Either will set status to CANCELLED in thread-safe manner.
    391393    }
    392394   
     
    691693
    692694    if(e instanceof InterruptedException) {
    693         SafeProcess.log("*** InterruptedException in GShell.runLocal()");
     695        DebugStream.println("We've been asked to stop.");
     696        SafeProcess.log("@@@ Interruption to SafeProcess run by GShell.runLocal()");
    694697        setStatus(CANCELLED); // expected exception
    695698    } else {
     
    706709    // Each instance of this class is run in its own thread by class SafeProcess.InputGobbler.
    707710    // This class deals with each incoming line from the perl process' stderr or stdout streams. One
    708     // instance of this class for each stream. However, since multiple instances of this CustomProcessHandler
    709     // could be (and in fact, are) writing to the same file in their own threads, several objects, not just
    710     // the bufferedwriter object, needed to be made threadsafe.
    711     // This class also handles exceptions during the running of the perl process.
    712     // The runPerlCommand code originally would do a sendProcessStatus on each exception, so we ensure
     711    // instance of this class for each stream. However, since multiple instances of this LineByLineHandler
     712    // are firing off events to the same outputstream object (bos) in their own threads, several objects,
     713    // not just bost, needed to be made threadsafe. So gotLine needs to be synchronized, and created a
     714    // synchronized setStatus() method above too.
     715    // This class also handles exceptions that may occur when reading from the perl process' stderr or stout.
     716    // The old GShell.runLocal() code would set the GShell.status to ERROR on each exception, so we ensure
    713717    // we do that here too, to continue original behaviour. These calls are also synchronized to make their
    714718    // use of the EventListeners threadsafe.
    715     protected class SynchronizedProcessHandler extends SafeProcess.CustomProcessHandler
     719    protected class SynchronizedLineByLineHandler extends SafeProcess.LineByLineHandler
    716720    {
    717721    private final BufferedOutputStream bos; // needs to be final to be able to synchronize on the shared object
    718722   
    719     public SynchronizedProcessHandler(BufferedOutputStream bos, int src) {
     723    public SynchronizedLineByLineHandler(BufferedOutputStream bos, int src) {
    720724        super(src); // will set this.source to STDERR or STDOUT
    721725        this.bos = bos; // caller will close bw, since many more than one
    722                             // SynchronizedProcessHandlers are using it
     726                            // SynchronizedLineByLineHandlers are using it
    723727    }
    724728
    725729    // trying to keep synchronized methods as short as possible
    726     private void logException(Exception e) {
    727         String stream = (this.source == SafeProcess.STDERR) ? "stderr" : "stdout";
    728         String msg = "Got exception when processing the perl process' " + stream + " stream.";
     730    public void gotException(Exception e) {
     731        String msg = "Got exception when processing the perl process' " + SafeProcess.streamToString(this.source) + " stream.";
    729732
    730733        DebugStream.println(msg); // method is already synchronized
     
    732735        SafeProcess.log("*** " + msg, e);
    733736
    734         GShell.this.setStatus(GShell.ERROR);
     737        GShell.this.setStatus(GShell.ERROR); // synchronized
    735738    }
    736739
     
    740743    }
    741744
    742     public void run(Closeable inputStream) {
    743         InputStream is = (InputStream) inputStream;
    744 
    745         BufferedReader br = null;
    746         try {       
    747 
    748         br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    749         String line=null;
    750 
    751         ///while ( !GShell.this.prcs.processRunning() || hasSignalledStop()) { // need to sync on prcs
    752         while ( !Thread.currentThread().isInterrupted() && (line = br.readLine()) != null ) {
    753             this.gotLine(line); // synchronized         
    754         }       
    755 
    756         } catch (Exception e) {
    757         logException(e);
    758         } finally {
    759         System.err.println("*********");
    760         if(Thread.currentThread().isInterrupted()) {
    761             log("We've been asked to stop.");
    762            
    763         }
    764         SafeProcess.closeResource(br);
    765         String stream = (this.source == SafeProcess.STDERR) ? "stderr" : "stdout";
    766         System.err.println("*** In finally of " + stream + " stream");
    767         System.err.println("*********");
    768         }
    769     }
    770 
    771     protected synchronized void gotLine(String line) {
     745    // every time we read a line from the SafeProcess' stderr or stdout stream we come here.
     746    // a different isntance of SynchronizedL
     747    public synchronized void gotLine(String line) {
    772748        fireMessage(GShell.this.type,   // not final, needs synchro
    773749            GShell.this.typeAsString(type) + "> " + line,
  • main/trunk/gli/src/org/greenstone/gatherer/util/SafeProcess.java

    r31638 r31639  
    2121
    2222public class SafeProcess {
     23    //public static int DEBUG = 0;
    2324
    2425    public static final int STDERR = 0;
     
    99100    public void setSplitStdErrorNewLines(boolean split) {
    100101    splitStdErrorNewLines = split;
    101     }
    102 
    103     // logger and DebugStream print commands are synchronized, therefore thread safe.
    104     public static void log(String msg) {
    105     //logger.info(msg);
    106 
    107     System.err.println(msg);
    108 
    109     //DebugStream.println(msg);
    110     }
    111 
    112     public static void log(String msg, Exception e) { // Print stack trace on the exception
    113     //logger.error(msg, e);
    114 
    115     System.err.println(msg);
    116     e.printStackTrace();
    117 
    118     //DebugStream.println(msg);
    119     //DebugStream.printStackTrace(e);
    120     }
    121 
    122     public static void log(Exception e) {
    123     //logger.error(e);
    124 
    125     e.printStackTrace();
    126 
    127     //DebugStream.printStackTrace(e);
    128     }
    129 
    130     public static void log(String msg, Exception e, boolean printStackTrace) {
    131     if(printStackTrace) {
    132         log(msg, e);
    133     } else {
    134         log(msg);
    135     }
    136102    }
    137103
     
    184150        this.exitValue = process.waitFor(); // can throw an InterruptedException if process did not terminate
    185151    } catch(InterruptedException ie) {
    186        
    187152        log("*** Process interrupted (InterruptedException). Expected to be a Cancel operation.");
    188153            // don't print stacktrace: an interrupt here is not an error, it's expected to be a cancel action
     154        if(exceptionHandler != null) {     
     155        exceptionHandler.gotException(ie);
     156        }           
    189157       
    190158        // propagate interrupts to worker threads here
     
    204172        // to swallow the interrupt this time and not let it propagate by commenting out the next line?
    205173        //Thread.currentThread().interrupt(); // re-interrupt the thread
    206        
     174
    207175        inputGobbler.interrupt();
    208176        errorGobbler.interrupt();
    209177        outputGobbler.interrupt();
    210        
     178
     179        // even after the interrupts, we want to proceed to calling join() on all the worker threads
     180        // in order to wait for each of them to die before attempting to destroy the process if it
     181        // still hasn't terminated after all that.
    211182    } finally {     
    212183       
     
    358329   
    359330    } finally {
    360 
    361         log("*** In finally of SafeProcess.runProcess(3 params)");
     331        //String cmd = (this.command == null) ? Arrays.toString(this.command_args) : this.command;
     332        //log("*** In finally of SafeProcess.runProcess(3 params): " + cmd);
    362333
    363334        if( process != null ) {
     
    440411        // http://www.javaworld.com/article/2071275/core-java/when-runtime-exec---won-t.html?page=2
    441412        // http://mark.koli.ch/leaky-pipes-remember-to-close-your-streams-when-using-javas-runtimegetruntimeexec       
     413       
     414        //String cmd = (this.command == null) ? Arrays.toString(this.command_args) : this.command;
     415        //log("*** In finally of SafeProcess.runProcess(2 params): " + cmd);
     416
    442417        if( process != null ) {
    443418        log("*** Going to call process.destroy 1");
     
    446421        log("*** Have called process.destroy 1");
    447422        }
    448         log("*** In finally of SafeProcess.runProcess(2 params)");
    449423    }
    450424   
     
    484458    protected CustomProcessHandler(int src) {
    485459    this.source = src; // STDERR or STDOUT or STDIN
    486 
    487     // modify threadname to prefix stream src (stdin/stderr/stdout)
    488     // Useful for debugging if thread is named clearly
    489     String stream;
    490     switch(src) {
    491     case SafeProcess.STDERR:
    492        stream = "stderr";
    493     case SafeProcess.STDOUT:
    494         stream = "stdout";
    495     default:
    496         stream = "stdin";
    497     }
    498     Thread.currentThread().setName(stream + Thread.currentThread().getName());
     460    }
     461
     462    public String getThreadNamePrefix() {
     463    return SafeProcess.streamToString(this.source);
    499464    }
    500465   
     
    510475    protected LineByLineHandler(int src) {
    511476    this.source = src; // STDERR or STDOUT
     477    }
     478
     479    public String getThreadNamePrefix() {
     480    return SafeProcess.streamToString(this.source);
    512481    }
    513482
     
    530499    private LineByLineHandler lineByLineHandler = null;
    531500
     501    protected InputStreamGobbler() {
     502    super("InputStreamGobbler");
     503    }
     504
    532505    public InputStreamGobbler(InputStream is)
    533506    {
    534     super("InputStreamGobbler"); // thread name
     507    this(); // sets thread name
    535508    this.is = is;
    536509    this.split_newlines = false;
     
    539512    public InputStreamGobbler(InputStream is, boolean split_newlines)
    540513    {
    541     super("InputStreamGobbler"); // thread name
     514    this(); // sets thread name
    542515    this.is = is;
    543516    this.split_newlines = split_newlines;
     
    546519    public InputStreamGobbler(InputStream is, CustomProcessHandler customHandler)
    547520    {
    548     super("InputStreamGobbler"); // thread name
     521    this(); // thread name
    549522    this.is = is;
    550523    this.customHandler = customHandler;
     524    this.adjustThreadName(customHandler.getThreadNamePrefix());
     525    }
     526
     527   
     528    private void adjustThreadName(String prefix) { 
     529    this.setName(prefix + this.getName());
    551530    }
    552531
    553532    public void setLineByLineHandler(LineByLineHandler lblHandler) {
    554533    this.lineByLineHandler = lblHandler;
     534    this.adjustThreadName(lblHandler.getThreadNamePrefix());
    555535    }
    556536
     
    579559        lineByLineHandler.gotException(ioe);
    580560        } else {
    581         log("Exception when reading from a process' stdout/stderr stream: ", ioe);
     561        log("Exception when reading process stream with " + this.getName() + ": ", ioe);
    582562        }
    583563    } finally {
    584         log("*********");
    585564        if(this.isInterrupted()) {
    586         log("We've been asked to stop.");       
     565        log("@@@ Successfully interrupted " + this.getName() + ".");
    587566        }
    588567        SafeProcess.closeResource(br);
    589         log("*** In finally of " + this.getName());
    590         log("*********");
    591568    }
    592569    }
     
    619596    private CustomProcessHandler customHandler = null;
    620597
     598    protected OutputStreamGobbler() {
     599    super("stdinOutputStreamGobbler"); // thread name
     600    }
     601
    621602    public OutputStreamGobbler(OutputStream os) {
    622     super("OutputStreamGobbler"); // thread name
     603    this(); // set thread name
    623604    this.os = os;
    624605    }
     
    626607    public OutputStreamGobbler(OutputStream os, String inputstr)
    627608    {
    628     super("OutputStreamGobbler"); // thread name
     609    this(); // set thread name
    629610    this.os = os;
    630611    this.inputstr = inputstr;
     
    632613
    633614    public OutputStreamGobbler(OutputStream os, CustomProcessHandler customHandler) {
    634     super("OutputStreamGobbler"); // thread name
     615    this(); // set thread name
    635616    this.os = os;
    636617    this.customHandler = customHandler;
     
    692673} // end static inner class OutputStreamGobbler
    693674
     675//**************** Static methods **************//
     676
     677
     678    // logger and DebugStream print commands are synchronized, therefore thread safe.
     679    public static void log(String msg) {
     680    //logger.info(msg);
     681
     682    System.err.println(msg);
     683
     684    //DebugStream.println(msg);
     685    }
     686
     687    public static void log(String msg, Exception e) { // Print stack trace on the exception
     688    //logger.error(msg, e);
     689
     690    System.err.println(msg);
     691    e.printStackTrace();
     692
     693    //DebugStream.println(msg);
     694    //DebugStream.printStackTrace(e);
     695    }
     696
     697    public static void log(Exception e) {
     698    //logger.error(e);
     699
     700    e.printStackTrace();
     701
     702    //DebugStream.printStackTrace(e);
     703    }
     704
     705    public static void log(String msg, Exception e, boolean printStackTrace) {
     706    if(printStackTrace) {
     707        log(msg, e);
     708    } else {
     709        log(msg);
     710    }
     711    }
     712
     713    public static String streamToString(int src) {
     714    String stream;
     715    switch(src) {
     716    case STDERR:
     717        stream = "stderr";
     718        break;
     719    case STDOUT:
     720        stream = "stdout";
     721        break;
     722    default:
     723        stream = "stdin";
     724    }
     725    return stream;
     726    }
    694727
    695728//**************** Useful static methods. Copied from GLI's Utility.java ******************
Note: See TracChangeset for help on using the changeset viewer.