Ignore:
Timestamp:
2017-05-18T20:38:56+12:00 (7 years ago)
Author:
ak19
Message:

Major changes to SafeProcess and classes of the download package, by bringing the final GLI (and final Greenstone) class DownloadJob over to use SafeProcess. Significant changes to SafeProcess: 1. Introduction of cancelRunningProcess as a new method, so that callers don't need to know how cancelling a process is implemented (as an interrupt) nor do they need to know what thread they ought to interrupt (which should be the thread that launched SafeProcess), nor do they need to know of the complexity surrounding the join() blocking call which should not be interrupted. 2. Introduction of the SafeProcess.MainHandler interface that provides methods that allow implementers to write hooks to various stages of the SafeProcess' internal process' life cycle. 3. moved process cleanUp() code into a reusable method within SafeProcess. Significant changes to DownloadJob and its associated DownloadProgressBar and DownloadScrollPane classes: 1. Unused member vars removed or commented out and those that can be declared final are now declared so, as a programming pricinple for thread safety, since DownloadJob and the other download classes will have to interact with multiple threads and could be called by different threads. 2. Replaced existing actionPerformed() and callDownload() of DownloadJob with SafeProcess and implemented the necessary Hooks in the SafeProcess.MainHandler() interface to ensure that perl is still told to terminate wget on a cancel action. 3. Replaced DownloadScrollPane's deleteDownloadJob() with a new version that now more responsively removes its DownloadProgressBar (with controls) for a DownloadJob. It's called by the SafeProcess.MainHandler interface hooks implemented by DownloadJob, so DownloadScrollPane no longer needs to wait() to be notify()ed when a process has cleaned up on premature termination by a cancel action. 4. Made the necessary methods in DownloadProgressBar synchronized for thread safety. 5. GShell now uses SafeProcess' new cancelRunningProcess() method in place of directly calling interrupt on the (GShell) thread that launched SafeProcess.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • main/trunk/gli/src/org/greenstone/gatherer/download/DownloadJob.java

    r22103 r31692  
    4242import java.util.*;
    4343import javax.swing.tree.*;
     44import javax.swing.SwingUtilities;
    4445import org.greenstone.gatherer.Configuration;
    4546import org.greenstone.gatherer.DebugStream;
     
    5051import org.greenstone.gatherer.file.WorkspaceTree;
    5152import org.greenstone.gatherer.util.AppendLineOnlyFileDocument;
     53import org.greenstone.gatherer.util.SafeProcess;
    5254import org.greenstone.gatherer.util.Utility;
    5355import org.greenstone.gatherer.cdm.Argument;
    5456import org.greenstone.gatherer.collection.*;
     57
    5558/**
    5659 * @author John Thompson, Greenstone Digital Library, University of Waikato
    5760 * @version 2.0
     61 * When modifying this class, bear in mind concurrency issues that could arise with
     62 * SafeProcess's worker threads and where synchronization may be needed to prevent such issues.
    5863 */
    5964public class DownloadJob
    60     implements ActionListener {
    61      
    62     private boolean debug;
    63     private boolean higher_directories;
    64     private boolean no_parents;
    65     private boolean other_hosts;
    66     private boolean page_requisites;
    67     private boolean quiet;
     65    implements ActionListener, SafeProcess.MainProcessHandler {
    6866
    6967    private AppendLineOnlyFileDocument download_log;
     
    7169    private DownloadProgressBar progress;
    7270
    73     private int depth;
    7471    private int previous_state;
    7572    private int state;
    7673
    77     private String download_url = "";
     74    private SafeProcess prcs = null;
     75
     76    private final String download_url;
    7877   
    7978    //    private String current_url;
    8079    //    private String destination;
    81     private String proxy_pass;
    82     private String proxy_user;
    83 
    84     private Vector encountered_urls;
    85     private Vector failed_urls;
     80    private final String proxy_pass;
     81    private final String proxy_user;
     82
     83    //private final Vector encountered_urls;
     84    //private Vector failed_urls;
    8685    private Download download;
    8786    private DownloadScrollPane mummy;
    8887    private HashMap download_option;
    8988
    90     public static int COMPLETE = 0;
    91     public static int PAUSED   = 1;
    92     public static int RUNNING  = 2;
    93     public static int STOPPED  = 3;
    94 
    95     public static int UNKNOWN_MAX  = 0;
    96     public static int DEFINED_MAX  = 1;
    97     public static int UNDEFINED_MAX  = 2;
     89    public static final int COMPLETE = 0;
     90    public static final int PAUSED   = 1;
     91    public static final int RUNNING  = 2;
     92    public static final int STOPPED  = 3;
     93
     94    public static final int UNKNOWN_MAX  = 0;
     95    public static final int DEFINED_MAX  = 1;
     96    public static final int UNDEFINED_MAX  = 2;
    9897   
    9998    // To prematurely terminate wget, we will need to use sockets and find a free port.
     
    103102    private static int nextFreePort = PORT_BASE;  // Keep track what port numbers we have checked for availability
    104103    int port;                                     // package access. The socket port number this instance of DownloadJob will use
     104                                             // only the main thread (where DownloadJob runs) modifies port, so no synching needed
    105105   
    106     private String mode = null;
     106    private final String mode;
    107107   
    108     private String proxy_url;
     108    private String proxy_url; // only the main thread (where DownloadJob runs) modifies this, so no synching needed
    109109   
    110110    /**
     
    149149
    150150    progress = new DownloadProgressBar(this,download_url, true);
    151     encountered_urls = new Vector();
    152     failed_urls = new Vector();
     151    //encountered_urls = new Vector();
     152    //failed_urls = new Vector();
    153153
    154154    previous_state = STOPPED;
     
    167167    }
    168168
    169     /** Depending on which button on the progress bar was pushed,
     169    /** Depending on which button on the progress bar was pushed,
    170170     * this method will affect the state of the DownloadJob and perhaps make
    171171     * calls to wget.class if necessary.
     
    173173     * which we must respond to.
    174174     */
    175     public void actionPerformed(ActionEvent event) {
     175    public void old_actionPerformed(ActionEvent event) {
    176176    // The stop_start_button is used to alternately start or stop the
    177177    // job. If the current state of the job is paused then this
     
    195195    }
    196196    }
     197   
     198    /** Depending on which button on the progress bar was pushed,
     199     * this method will affect the state of the DownloadJob and perhaps make
     200     * calls to wget.class if necessary.
     201     * @param event The ActionEvent fired from within the DownloadProgressBar
     202     * which we must respond to.
     203     * Now using synchronized methods like previous_state = getState(); instead of
     204     * previous_state = state; and setState(STOPPED); instead of state = STOPPED;
     205     */
     206    public void actionPerformed(ActionEvent event) {
     207    // The stop_start_button is used to alternately start or stop the
     208    // job. If the current state of the job is paused then this
     209    // restart is logically equivalent to a resume.
     210    if(event.getSource() == progress.stop_start_button) {
     211        previous_state = getState();
     212        if (getState() == RUNNING) {
     213        //setState(STOPPED);
     214        stopDownload(); // cancels any running SafeProcess     
     215        } else {
     216        //previous_state = getState();
     217        setState(RUNNING);
     218        mummy.resumeThread();
     219        }
     220    }
     221    else if (event.getSource() == progress.close_button) {
     222        SafeProcess.log("@@@ Progress bar close button pressed");
     223        if(getState() == RUNNING) {
     224        previous_state = getState();
     225        //setState(STOPPED); // do we need to do anything else to stop this? YES, we do:
     226        stopDownload(); // cancels any running SafeProcess     
     227        }
     228        mummy.deleteDownloadJob(this);
     229    }
     230    }
    197231
    198232    /** Given a portnumber to check, returns true if it is available
     
    229263    }
    230264
    231     public void callDownload() {
     265    // If eschewing the use of SafeProcess, reactivate (by renaming) old_callDownload()
     266    // and old_actionPerformed(), and DownloadScrollPane.java's old_deleteDownloadJob().
     267    public void old_callDownload() {
    232268
    233269    ArrayList command_list = new ArrayList();
     
    537573    }
    538574
     575    public void callDownload() {
     576
     577    ArrayList command_list= new ArrayList();
     578   
     579    // the following also works for client-gli if downloading is enabled (when there's a gs2build directory inside gli)
     580    command_list.add(Configuration.perl_path);
     581    command_list.add("-S");
     582    command_list.add(LocalGreenstone.getBinScriptDirectoryPath()+"downloadfrom.pl");
     583    command_list.add("-download_mode");
     584    command_list.add(mode);
     585    command_list.add("-cache_dir");
     586    command_list.add(Gatherer.getGLIUserCacheDirectoryPath());
     587    // For the purposes of prematurely terminating wget from GLI (which creates a socket
     588    // as a communication channel between GLI and Perl), it is important to tell the script
     589    // that we're running as GLI. Because when running from the command prompt, it should
     590    // not create this socket and do the related processing.
     591    command_list.add("-gli");
     592   
     593    ArrayList all_arg = download.getArguments(true,false);
     594    for(int i = 0; i < all_arg.size(); i++) {
     595        Argument argument = (Argument) all_arg.get(i);
     596        if(argument.isAssigned()) {
     597        command_list.add("-" + argument.getName());
     598        if(argument.getType() != Argument.FLAG) {
     599            command_list.add(argument.getValue());
     600        }
     601        }   
     602    }
     603
     604    String [] cmd = (String []) command_list.toArray(new String[0]);
     605    DebugStream.println("Download job, "+command_list);
     606
     607    if (previous_state == DownloadJob.COMPLETE) {
     608        progress.mirrorBegun(true, true);
     609    }
     610    else {
     611        progress.mirrorBegun(false, true);
     612    }
     613
     614    try {
     615        Runtime rt = Runtime.getRuntime();
     616
     617        String [] env = null;
     618         
     619        if (Utility.isWindows()) {
     620                prcs = new SafeProcess(cmd);
     621        }
     622        else {
     623        if (proxy_url != null && !proxy_url.equals("")) {           
     624            // Specify proxies as environment variables
     625            // Need to manually specify GSDLHOME and GSDLOS also
     626            env = new String[4];
     627                    proxy_url = proxy_url.replaceAll("http://","");
     628            env[0] = "http_proxy=http://"+proxy_url;
     629            env[1] = "ftp_proxy=ftp://"+proxy_url;
     630            env[2] = "GSDLHOME=" + Configuration.gsdl_path;
     631            env[3] = "GSDLOS=" + Gatherer.client_operating_system;
     632
     633            prcs = new SafeProcess(cmd, env, null);
     634        }
     635        else if(Gatherer.isGsdlRemote && Gatherer.isDownloadEnabled) {
     636            // Not Windows, but running client with download panel
     637            // Need to manually specify GSDLHOME and GSDLOS
     638            env = new String[2];
     639            env[0] = "GSDLHOME=" + Configuration.gsdl_path;
     640            env[1] = "GSDLOS=" + Gatherer.client_operating_system;
     641
     642            prcs = new SafeProcess(cmd, env, null);
     643        }
     644        else {
     645            // Will inherit the GLI's environment, with GSDLHOME and GSDLOS set
     646            prcs = new SafeProcess(cmd);
     647        }
     648        }
     649        //System.out.println(newcmd);
     650        prcs.setMainHandler(this); // attach handler to clean up before and after process.destroy()
     651                                   // for which DownloadJob implements SafeProcess.MainProcessHandler
     652
     653        // To be able to stop Wget, we use sockets to communicate with the perl process that launched wget
     654        if (mode.equals("Web") || mode.equals("MediaWiki")) { // wget download modes other than OAI
     655       
     656        // Need to find an available (unused) port within the range we're looking for to pass it
     657        // the Perl child process, so that it may set up a listening ServerSocket at that port number
     658        try {
     659            boolean foundFreePort = false;
     660            for(int i = 0; i < PORT_BLOCK_SIZE; i++) {
     661
     662            if(isPortAvailable(nextFreePort)) {
     663                foundFreePort = true;
     664                break;
     665               
     666            } else {
     667                incrementNextFreePort();
     668            }
     669            }
     670           
     671            if(foundFreePort) {
     672            // Free port number currently found becomes the port number of the socket that this
     673            // DownloadJob instance will be connecting to when the user wants to prematurely stop Wget.
     674            this.port = nextFreePort;
     675            incrementNextFreePort(); //// Necessary?
     676           
     677            } else {
     678            throw new Exception("Cannot find an available port in the range "
     679                        + PORT_BASE + "-" + (PORT_BASE+PORT_BLOCK_SIZE)
     680                        + "\nwhich is necessary for forcibly terminating wget.");
     681            }
     682
     683            // Communicate the chosen port for this DownloadJob instance to the perl process, so
     684            // that it can set up a ServerSocket at that port to listen for any signal to terminate wget
     685            //OutputStream os = prcs.getOutputStream();
     686            String p = ""+this.port+"\n";
     687            System.err.println("Portnumber found: " + p);
     688
     689            prcs.setInputString(p);
     690         
     691        } catch(Exception ex) {
     692            System.err.println("Sent available portnumber " + this.port + " to process' outputstream.\nBut got exception: " + ex);
     693        }
     694        }
     695
     696        ProcessErrHandler errHandler = new ProcessErrHandler(); // meaningful output comes from prcs stderr
     697        ProcessOutHandler outHandler = new ProcessOutHandler(); // debugging output comes from prcs' stdout
     698
     699        int exitVal = prcs.runProcess(null, outHandler, errHandler);
     700       
     701        // if prcs is interrupted (cancelled) during the blocking runProcess() call,
     702        // as happens on state == STOPPED, then
     703        // beforeWaitingForStreamsToEnd() is called before the process' worker threads come to a halt
     704        // and afterStreamsEnded() is called when the process' worker threads have halted,
     705        // beforeProcessDestroy() is called before the process is destroyed,
     706        // and afterProcessDestroy() is called after the proc has been destroyed.
     707        // If when beforeWaitingForStreamsEnd() stage the perl was still running but had been
     708        // told to stop, then the beforeWaitingForStreamsEnd() method will make sure to communicate
     709        // with the perl process over a socket and send it the termination message,
     710        // which will also kill any runnning wget that perl launched.
     711        // In that case, destroy() is actually called on the process at last.
     712
     713    }
     714    catch (Exception ioe) {
     715        SafeProcess.log(ioe);
     716        DebugStream.printStackTrace(ioe);
     717    }
     718
     719    // now the process is done, we can at last null it
     720    prcs = null;
     721
     722    // If we've got to here and the state isn't STOPPED then the
     723    // job is complete.
     724    if(getState() == DownloadJob.RUNNING) {
     725        progress.mirrorComplete();
     726        previous_state = getState();
     727        setState(DownloadJob.COMPLETE);     
     728    }   
     729
     730    SafeProcess.log("@@@@ DONE callDownload()");
     731   
     732    /*
     733    // Regardless of whether state==STOPPED or ends up being COMPLETE, the process is at an end now.
     734    // Notify the DownloadScrollPane which is waiting on this job to complete that we are ready
     735    synchronized(this) {
     736        System.err.println("**************** Notifying download scrollpane");
     737        this.notify();
     738    }
     739    */ 
     740
     741    // refresh the workspace tree
     742    Gatherer.g_man.refreshWorkspaceTree(WorkspaceTree.DOWNLOADED_FILES_CHANGED);
     743    }
     744
     745    private synchronized boolean isStopped() { return state == STOPPED; }
     746
     747    // called when the user cancelled the download and we're told to stop both our external perl process
     748    // and the wget process that it in turn launched
     749    public void stopDownload() {   
     750    if(prcs != null) {     
     751        SafeProcess.log("@@@ Going to interrupt the SafeProcess...");       
     752
     753        // Whether a process ends naturally or is prematurely ended, beforeWaitingForStreamsToEnd()
     754        // will be called. We've hooked this in to calling tellPerlToTerminateWget() only if the
     755        // process is still running when cancel is pressed, but not when it's naturally terminated.
     756        boolean hadToSendInterrupt = prcs.cancelRunningProcess(); // returns false if it was already terminating/terminated, true if interrupt sent
     757
     758
     759        /*
     760          // if process terminating naturally but waiting for process' worker threads to join(),
     761          // shall we just remove the progress bar display for this download?
     762          // If so, do this section in place of the 2 calls to progress.enableCancelJob(boolean) below
     763        if(!hadToSendInterrupt && SwingUtilities.isEventDispatchThread()) {
     764        if(getState() == DownloadJob.RUNNING) {
     765            progress.mirrorComplete();
     766            previous_state = getState();
     767            setState(DownloadJob.COMPLETE);     
     768        }
     769        mummy.deleteCurrentDownloadJob(this); // why wait for the cleanup which can't be interrupted anyway?
     770        }
     771        */
     772    } else {
     773        System.err.println("@@@@ No process to interrupt");
     774    }
     775
     776    //setState(STOPPED); // would set it to stop on cancel, even if it already naturally terminated
     777   
     778    }
     779   
     780//*********** START of implementing interface Safeprocess.MainProcessHandler
     781    // before and after processDestroy only happen when interrupted AND terminatePerlScript=true
     782    public void beforeProcessDestroy() {}   
     783    public void afterProcessDestroy() {}
     784
     785    // after blocking call on closing up streamgobbler worker threads that happens
     786    // upon natural termination or interruption of process' main body/thread.
     787    // if not overriding, then return the parameter forciblyTerminating as-is
     788    public boolean afterStreamsEnded(boolean forciblyTerminating) { return forciblyTerminating; }
     789
     790    // called after the SafeProcess has fully terminated (naturally or via process.destroy())
     791    // and has been cleaned up
     792    public void doneCleanup(boolean wasForciblyTerminated) {
     793    // let the user know they can cancel again now cleanup phase is done
     794    progress.enableCancelJob(true);
     795   
     796    if(wasForciblyTerminated) {
     797        setState(STOPPED); // sets it to stop only if process truly was prematurely terminated, not merely
     798        // if the cancel button was clicked when it had already naturally terminated
     799
     800        // we're now ready to remove the display of the until now running job
     801        // from the download progress bar interface
     802        mummy.deleteCurrentDownloadJob(this);
     803    } /*else {
     804        // If we've got to here and the state isn't STOPPED then the
     805        // job is complete.
     806        System.err.println("**************** NOT Notifying download scrollpane");
     807        if(getState() == DownloadJob.RUNNING) {
     808        progress.mirrorComplete();
     809        previous_state = getState();
     810        setState(DownloadJob.COMPLETE);     
     811        }
     812    }
     813   
     814    // Regardless of whether state==STOPPED or ends up being COMPLETE, the process is at an end now.
     815    // Notify the DownloadScrollPane which is waiting on this job to complete that we are ready
     816    synchronized(this) {
     817        System.err.println("**************** Notifying download scrollpane");
     818        this.notify();
     819        }*/
     820
     821    }
     822
     823    // before blocking call of ending streamgobbler worker threads that happens
     824    // after process' main body/thread has naturally terminated or been interrupted
     825    public boolean beforeWaitingForStreamsToEnd(boolean forciblyTerminating) {
     826    // let the user know they can't cancel during cleanup phase
     827    progress.enableCancelJob(false);
     828
     829    SafeProcess.log("**** in beforeWaitingForStreamsToEnd()");
     830
     831    // state would not be STOPPED if cancel was pressed after the process naturally terminated anyway
     832    // in that case we don't need to send perl the signal to terminate WGET
     833    if(!forciblyTerminating) { //if(!isStopped()) {
     834        SafeProcess.log("*** Process not (yet) cancelled/state not (yet) stopped");
     835        SafeProcess.log("*** But process has naturally terminated (process streams are being closed before any interruption signal can be received), so won't be destroying process even on interrupt");
     836        return false; // for us to be in this method at all with forciblyTerminating being false
     837            // means the process is already naturally terminating, so don't unnaturally destroy it
     838    }
     839   
     840    // else the process is still running and we've been told to stop, so tell perl to stop wget first
     841    // (so that process destroy can then be called thereafter)
     842    return tellPerlToTerminateWget();
     843    }
     844//*********** END of implementing interface Safeprocess.MainProcessHandler
     845   
     846    public boolean tellPerlToTerminateWget() {
     847    SafeProcess.log("**** in tellPerlToTerminateWget()");
     848   
     849    boolean terminatePerlScript = true;
     850   
     851    // When GLI is working with wget-based download modes other than OAI (MediaWiki and Web
     852    // download) and the STOP button has been pressed, wget needs to be prematurely terminated.
     853    // Only wget download modes Web and MediaWiki require the use of sockets to communicate
     854    // with the perl script in order to get wget to terminate. Other download modes, including
     855    // wgetdownload mode OAI, can terminate in the traditional manner: close process inputstream
     856    // and kill perl process. OAI launches many wgets. So that when the perl process is terminated,
     857    // the currently running wget will finish off but other wgets are no longer launched.
     858    if((mode.equals("Web") || mode.equals("MediaWiki"))) {
     859        SafeProcess.log("@@@ Socket communication to end wget");
     860        // create a socket to the perl child process and communicate the STOP message
     861        Socket clientSocket = null;
     862        BufferedReader clientReader = null;
     863        OutputStream os = null;
     864       
     865        if(clientSocket == null) {
     866        try {
     867            clientSocket = new Socket("localhost", this.port); // connect to the port chosen for this DownloadJob instance
     868           
     869            clientReader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
     870            String response = clientReader.readLine(); // see if we've been connected
     871            System.err.println("Communicating with perl download script on port " + this.port
     872                       + "\nGot response from perl: " + response);
     873           
     874            // Send the STOP signal
     875            os = clientSocket.getOutputStream();
     876            String message = "<<STOP>>\n";
     877            os.write(message.getBytes());
     878            response = clientReader.readLine(); // see whether the stop signal has been received
     879            System.err.println("GLI sent STOP signal to perl to terminate wget."
     880                       + "\nGot response from perl: " + response);
     881           
     882            response = clientReader.readLine(); // see whether the perl script is ready to be terminated
     883            System.err.println("Got another response from perl: " + response);     
     884           
     885            if(response == null) { // why? Is it because the process has already terminated naturally if response is null?
     886            terminatePerlScript = false;
     887            }
     888        } catch(IOException ex) {
     889            if(ex instanceof IOException && ex.getMessage().indexOf("Connection refused") != -1) {
     890            terminatePerlScript = false; // no socket listening on other end because process ended
     891            System.err.println("Tried to communicate through client socket - port " + this.port + ", but the process seems to have already ended naturally");
     892            } else {
     893            System.err.println("Tried to communicate through client socket - port " + this.port + ", but got exception: " + ex);
     894            }
     895
     896        } catch(Exception ex) {
     897            System.err.println("Tried to open client socket, but got exception: " + ex);
     898        } finally {
     899            SafeProcess.closeResource(os);
     900            SafeProcess.closeResource(clientReader);
     901            SafeProcess.closeSocket(clientSocket); // close the clientSocket (the Perl end will close the server socket that Perl opened)
     902            os = null;
     903            clientReader = null;
     904            clientSocket = null;
     905        }
     906        }
     907    }
     908   
     909    return terminatePerlScript; // if true, it will call destroy() on the SafeProcess' process
     910    }
     911
    539912
    540913    /** Called by the WGet native code when the current download is
     
    542915     */
    543916    public void downloadComplete() {
    544     progress.downloadComplete();
     917    progress.downloadComplete(); // now this is synchronized
    545918    }
    546919
     
    548921    public void downloadComplete(String current_file_downloading)
    549922    {
    550     progress.downloadComplete();
     923    progress.downloadComplete(); // now this is synchronized
    551924    DebugStream.println("Download complete: " + current_file_downloading);
    552925    }
     
    558931    public void downloadFailed() {
    559932    // TODO!!
     933    //synchronized(failed_urls) {
    560934    //failed_urls.add(current_url); // It is the current url that failed
    561     progress.downloadFailed();
     935    //}
     936    progress.downloadFailed(); // now this is synchronized
    562937    //DebugStream.println("Download failed: " + current_url);
    563938    }
     
    566941     */
    567942    public void downloadWarning() {
    568     progress.downloadWarning();
     943    progress.downloadWarning(); // now this is synchronized
    569944    }
    570945
     
    584959     * @return An int representing the current DownloadJob state.
    585960     */
    586     public int getState() {
     961    public synchronized int getState() {
    587962    return state;
    588963    }
     
    592967     * stop.
    593968     */
    594     public boolean hasSignalledStop() {
     969    public synchronized boolean hasSignalledStop() {
    595970    if(state == DownloadJob.STOPPED || state == DownloadJob.PAUSED ||
    596971       state == DownloadJob.COMPLETE) {
     
    600975    }
    601976
    602     public void setState(int state) {
     977    public synchronized void setState(int state) {
    603978    previous_state = this.state;
    604979    this.state = state;
     
    624999
    6251000   
    626     // Inner thread class that reads from process downloadfrom.pl's errorstream
    627     private class PerlReaderThread extends Thread {
    628     Process prcs = null;
    629 
    630     public PerlReaderThread(Process proc) {
    631         this.prcs = proc;
    632     }
    633 
    634     public void run() {
     1001    /*
     1002      Go through https://docs.oracle.com/javase/tutorial/essential/concurrency/atomicvars.html series of
     1003      Java articles on concurrency again.
     1004      Go through http://docs.oracle.com/javase/tutorial/uiswing/concurrency/
     1005
     1006      http://stackoverflow.com/questions/574240/is-there-an-advantage-to-use-a-synchronized-method-instead-of-a-synchronized-blo
     1007
     1008       "Not only do synchronized methods not lock the whole class, but they don't lock the whole instance either. Unsynchronized methods in the class may still proceed on the instance."
     1009       "Only the syncronized methods are locked. If there are fields you use within synced methods that are accessed by unsynced methods, you can run into race conditions."
     1010       
     1011       "synchronizing on "this" is considered in some circles to be an anti-pattern. The unintended consequence is that outside of the class someone can lock on an object reference that is equal to "this" and prevent other threads from passing the barriers within the class potentially creating a deadlock situation. Creating a "private final Object = new Object();" variable purely for locking purposes is the often used solution. Here's another question relating directly to this issue. http://stackoverflow.com/questions/442564/avoid-synchronizedthis-in-java?lq=1"
     1012
     1013       "A private lock is a defensive mechanism, which is never a bad idea.
     1014
     1015       Also, as you alluded to, private locks can control granularity. One set of operations on an object might be totally unrelated to another but synchronized(this) will mutually exclude access to all of them."
     1016
     1017     http://stackoverflow.com/questions/8393883/is-synchronized-keyword-exception-safe
     1018     "In any scoped thread-safe block, the moment you get out of it, the thread-safety is gone."
     1019     "In case of an exception the lock will be released."
     1020
     1021     http://stackoverflow.com/questions/8259479/should-i-synchronize-listener-notifications-or-not
     1022     "Use a CopyOnWriteArrayList for your listener arrays."
     1023     "If you use the CopyOnWriteArrayList, then you don't have to synchronize when iterating."
     1024     "CopyOnWriteArrayList is thread-safe, so there is no need to synchronize."
     1025
     1026     "Use a ConcurrentLinkedQueue<Listener> ... for this kind of problems: adding, removing and iterating simultaneously on a collection.
     1027     A precision : this solution prevents a listener from being called from the very moment it is deregistered."
     1028     "It means that you start iterating, an element is added, it will be called, another is removed, it won't, all this in the same iteration cycle.
     1029     It's the best of both world: ensuring synchronization, while being fine grained on who gets called and who's not."
     1030
     1031     http://stackoverflow.com/questions/8260205/when-a-listener-is-removed-is-it-okay-that-the-event-be-called-on-that-listener
     1032
     1033     http://stackoverflow.com/questions/2282166/java-synchronizing-on-primitives
     1034     
     1035     1. You can't lock on a primitive and
     1036     2. Don't lock on a Long unless you're careful how you construct them. Long values created by autoboxing or Long.valueOf() in a certain range are guaranteed to be the same across the JVM which means other threads could be locking on the same exact Long object and giving you cross-talk. This can be a subtle concurrency bug (similar to locking on intern'ed strings).
     1037
     1038     Cross-talk:
     1039     "In electronics, crosstalk is any phenomenon by which a signal transmitted on one circuit or channel of a transmission system creates an undesired effect in another circuit or channel. Crosstalk is usually caused by undesired capacitive, inductive, or conductive coupling from one circuit, part of a circuit, or channel, to another."
     1040    */
     1041
     1042
     1043    // Inner thread class that reads from process downloadfrom.pl's std output stream
     1044    private class ProcessOutHandler extends SafeProcess.CustomProcessHandler {
     1045   
     1046    public ProcessOutHandler() {
     1047        super(SafeProcess.STDOUT);
     1048    }
     1049
     1050    public void run(Closeable stream) {
     1051        InputStream is = (InputStream) stream;
     1052        BufferedReader eReader = null;
    6351053        try {
    636         if(prcs != null) {
    637             String message = null;
    638             BufferedReader eReader = new BufferedReader(new InputStreamReader(prcs.getInputStream()));
    639             while(prcs != null && (message = eReader.readLine()) != null) {
    640             if(!message.equals("\n")) {
    641                 System.err.println("**** Perl STDOUT: " + message);
    642             }
    643             }
    644            
    645             if(prcs != null && eReader != null) {
    646             eReader.close();
    647             eReader = null;
    648             System.err.println("**** Perl ENDed.");
    649             }
    650         }
     1054       
     1055        String message = null;
     1056        eReader = new BufferedReader(new InputStreamReader(is));
     1057        while(!Thread.currentThread().isInterrupted() && (message = eReader.readLine()) != null) {
     1058            if(!message.equals("\n")) {
     1059            System.err.println("**** Perl STDOUT: " + message);
     1060            }
     1061        }
     1062        if(Thread.currentThread().isInterrupted()) {
     1063            System.err.println("**** Perl INTERRUPTed.");
     1064        } else {
     1065            System.err.println("**** Perl ENDed.");
     1066        }
     1067       
    6511068        } catch(Exception e) {
    6521069        System.err.println("Thread - caught exception: " + e);
    653         }
     1070        } finally {
     1071        if(Thread.currentThread().isInterrupted()) {
     1072            SafeProcess.log("@@@ Successfully interrupted " + Thread.currentThread().getName() + ".");
     1073        }
     1074        SafeProcess.closeResource(eReader);
     1075        eReader = null;
     1076        }
     1077    }
     1078    }
     1079
     1080
     1081    private class ProcessErrHandler extends SafeProcess.CustomProcessHandler {
     1082   
     1083    public ProcessErrHandler() {
     1084        super(SafeProcess.STDERR);
     1085    }
     1086
     1087    public void run(Closeable stream) {
     1088        InputStream eis = (InputStream) stream;
     1089       
     1090        BufferedReader br = null;
     1091        try {
     1092        br = new BufferedReader(new InputStreamReader(eis));
     1093       
     1094        // Capture the standard error stream and search for two particular occurrences.
     1095        String line="";
     1096        boolean ignore_for_robots = false;
     1097        int max_download = DownloadJob.UNKNOWN_MAX;
     1098       
     1099        // handle to outer class objects that need synchronization (on either objects or their methods)
     1100        DownloadProgressBar progress = DownloadJob.this.progress;
     1101        AppendLineOnlyFileDocument download_log = DownloadJob.this.download_log;
     1102
     1103        while (!Thread.currentThread().isInterrupted() && (line = br.readLine()) != null
     1104               && !line.trim().equals("<<Finished>>") /*&& !isStopped()*/) {
     1105            if (max_download == DownloadJob.UNKNOWN_MAX) {
     1106            if(line.lastIndexOf("<<Defined Maximum>>") != -1) {
     1107                max_download = DownloadJob.DEFINED_MAX;
     1108            }
     1109            else if (line.lastIndexOf("<<Undefined Maximum>>") != -1) {
     1110                max_download = DownloadJob.UNDEFINED_MAX;
     1111            }
     1112            }
     1113            else if(max_download == DownloadJob.UNDEFINED_MAX) {
     1114            DebugStream.println(line);         
     1115            download_log.appendLine(line); // now synchronized
     1116            // The first magic special test is to see if we've just
     1117            // asked for the robots.txt file. If so we ignore
     1118            // the next add and then the next complete/error.
     1119            if(line.lastIndexOf("robots.txt;") != -1) {
     1120                DebugStream.println("***** Requesting robot.txt");
     1121                ignore_for_robots = true;
     1122            }
     1123            // If line contains "=> `" display text as the
     1124            // currently downloading url. Unique to add download.
     1125            else if(line.lastIndexOf("=> `") != -1) {
     1126                if(!ignore_for_robots) {
     1127                // Add download
     1128                String new_url = line.substring(line.indexOf("`") + 1, line.lastIndexOf("'"));
     1129               
     1130                // now synchronized
     1131                progress.addDownload("file"); //addDownload("http:/" + new_url.substring(cachedir_prefix_length()-1));
     1132                }
     1133            }
     1134            // If line contains "/s) - `" set currently
     1135            // downloading url to "Download Complete".
     1136            else if(line.lastIndexOf("/s) - `") != -1) {
     1137                String current_file_downloading = line.substring(line.indexOf("`") + 1, line.lastIndexOf("'"));
     1138                if(!ignore_for_robots) {
     1139                DebugStream.println("Not ignore for robots");
     1140                // Download complete
     1141                downloadComplete(current_file_downloading); // synchronized
     1142                }
     1143                else {
     1144                DebugStream.println("Ignore for robots");
     1145                ignore_for_robots = false;
     1146                }
     1147            }
     1148            // The already there line begins "File `..." However this
     1149            // is only true in english, so instead I looked and there
     1150            // are few (if any at all) other messages than those above
     1151            // and not overwriting messages that use " `" so we'll
     1152            // look for that. Note this method is not guarenteed to be
     1153            // unique like the previous two.
     1154            else if(line.lastIndexOf(" `") != -1) {
     1155                // Not Overwriting
     1156                DebugStream.println("Already there.");
     1157                String new_url = line.substring(line.indexOf("`") + 1, line.lastIndexOf("'"));
     1158               
     1159                progress.addDownload("file"); //addDownload("http:/" + new_url.substring(cachedir_prefix_length()-1));
     1160                downloadWarning();
     1161            }
     1162            // Any other important message starts with the time in the form hh:mm:ss
     1163            else if(line.length() > 7) {
     1164                if(line.charAt(2) == ':' && line.charAt(5) == ':') {
     1165                if(!ignore_for_robots) {
     1166                    DebugStream.println("Error.");
     1167                    downloadFailed();
     1168                }
     1169                else {
     1170                    ignore_for_robots = false;
     1171                }
     1172                }
     1173            }
     1174            }
     1175            else if (max_download == DownloadJob.DEFINED_MAX) {
     1176            if (line.lastIndexOf("<<Total number of record(s):") != -1) {
     1177                String total_ID = line.substring(line.indexOf(":") + 1, line.indexOf(">"));
     1178
     1179                progress.setTotalDownload((Integer.valueOf(total_ID)).intValue());
     1180                progress.resetFileCount(); 
     1181                progress.addDownload("files"); // for display: "Downloading files"
     1182
     1183            }
     1184            else if (line.lastIndexOf("<<Done>>") != -1) {
     1185                progress.increaseFileCount();
     1186            }
     1187            else if(line.lastIndexOf("<<Done:") != -1) {
     1188                String completed_amount = line.substring(line.indexOf(":") + 1, line.indexOf(">"));
     1189                progress.increaseFileCount((Integer.valueOf(completed_amount)).intValue());
     1190            }
     1191           
     1192            DebugStream.println(line);
     1193            download_log.appendLine(line);
     1194            }
     1195            else {
     1196            System.out.println("Error!!");
     1197            System.exit(-1);
     1198            }
     1199        }
     1200       
     1201        } catch (IOException ioe) {
     1202        //message(Utility.ERROR, ioe.toString());
     1203        //JTest
     1204        DebugStream.printStackTrace(ioe);
     1205
     1206        } finally {
     1207        if(Thread.currentThread().isInterrupted()) { // if the thread this class is running in is interrupted
     1208            SafeProcess.log("@@@ Successfully interrupted " + Thread.currentThread().getName() + ".");         
     1209        }
     1210
     1211        SafeProcess.closeResource(br);
     1212        br = null;
     1213        }
     1214       
    6541215    }
    6551216    }
Note: See TracChangeset for help on using the changeset viewer.