Ignore:
Timestamp:
2012-07-13T12:06:06+12:00 (12 years ago)
Author:
jmt12
Message:

More debug comments, fixed memory leak - still hanging unfortunately

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/src/mpidspacemediafilter-src/mpidspacemediafilter.cpp

    r25810 r25944  
    2424{
    2525  int numtasks, rank, rc;            // MPI variables
    26 
    27   if (3 != argc )
    28   {
    29     cerr << "Usage: " << argv[0] << " dspacehome filelist" << endl;
     26  unsigned long int seconds = 0;
     27
     28  if (4 != argc )
     29  {
     30    fprintf(stderr,"Usage: mpidspacemediafilter gsdlhome dspacehome filelist\n");
    3031    exit(-1);
    3132  }
    3233
    33   char *dspacehomedir = argv[1];      // location of import script
    34   char *filelist = argv[2];
     34  char *gsdlhomedir = argv[1];
     35  char *dspacehomedir = argv[2];      // location of import script
     36  char *filelist = argv[3];
    3537
    3638  // start MPI environment
     
    3840  if (rc != MPI_SUCCESS)
    3941  {
    40     printf ("Error starting MPI program. Terminating.\n");
     42    fprintf(stderr, "Error starting MPI program. Terminating.\n");
    4143    MPI_Abort(MPI_COMM_WORLD, rc);
    4244  }
     
    5052  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    5153
     54  // Get processor name too - important when it could be anywhere in a cluster
     55  int name_length;
     56  char processor_name[MPI_MAX_PROCESSOR_NAME];
     57  MPI_Get_processor_name(processor_name, &name_length);
     58
    5259  // master node processing
    5360  if (rank == 0)
    5461  {
    55     cerr << " * Master Starting" << endl;
     62    seconds = time(NULL);
     63    fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
    5664    char incoming[BUFFERSIZE];          // buffer for acknowledgments
    5765    char buffer[BUFFERSIZE];         // buffer to send tasks
     
    6674
    6775    // set initial status of all processors to idle
    68     cerr << " - initializing processor state" << endl;
     76    fprintf(stderr, "[M] Initializing processor state\n");
    6977    for ( int j=0; j<BUFFERSIZE; j++ )
    7078    {
     
    7381
    7482    // scan through contents of file listing
    75     cerr << " - processing contents of filelist.txt" << endl;
     83    fprintf(stderr, "[M] Processing contents of filelist.txt\n");
    7684    while (!infile.eof ())
    7785    {
     
    8391        int dest=0;
    8492        int found = 0;
    85         cerr << " - searching for idle processor" << endl;
     93        fprintf(stderr, "[M] Searching for idle processor\n");
    8694        while ((dest<(numtasks-1)) && (found == 0))
    8795        {
     
    99107        if (found == 0)
    100108        {
    101           cerr << " - waiting for processor to become idle" << endl;
     109          fprintf(stderr, "[M] Waiting for processor to become idle\n");
    102110          MPI_Waitany (numtasks-1, request, &dest, status);
    103111        }
     
    105113        // No need for manifests - just send the identifier of the item
    106114        // to filter to the worker
    107         cerr << " - writing dspace identifier as instruction to worker" << endl;
     115        fprintf(stderr, "[M] Writing dspace identifier as instruction to worker\n");
     116        // Jiggerypokery to get around weird compiler error: cannot pass
     117        // objects of non-POD type ‘struct std::string’ through ‘...’; call
     118        // will abort at runtime
    108119        char *line = new char [line_str.size() + 1];
    109120        strcpy(line, line_str.c_str());
    110121        sprintf(buffer, "%s", line);
     122        delete [] line;
    111123
    112124        // mark processors as busy
    113125        incoming[dest] = 'B';
    114126        // send out the job to the processor
    115         cerr << " - asking worker to start" << endl;
     127        fprintf(stderr, "[M] Asking worker %d to start\n", (dest + 1));
    116128        MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
    117129        // wait for a done acknowledgement
    118130        MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
    119         cerr << " - worker replied that it has started" << endl;
     131        fprintf(stderr, "[M] Worker %d replied that it has started\n", (dest + 1));
    120132        // update counter of actual tasks
    121133        if (dest > actualTasks)
    122134        {
    123135          actualTasks = dest;
    124           cerr << " - increased the number of running workers to: " << actualTasks << endl;
     136          fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks);
    125137        }
    126138      }
     
    130142
    131143    // wait until all outstanding tasks are completed
    132     cerr << " - waiting for all outstanding tasks to complete" << endl;
     144    fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n");
    133145    int dest;
    134146    for ( int k=0; k<actualTasks; k++ )
     
    138150
    139151    // send message to end all processing engines
    140     cerr << " * Master asking children to exit" << endl;
     152    fprintf(stderr,"[M] Master asking children to exit\n");
    141153    char endstr[5] = "end";
    142154    for ( int i=1; i<numtasks; i++ )
     
    145157    }
    146158
     159    seconds = time(NULL);
     160    fprintf(stderr, "[M:%lu] Master will exit when workers complete\n", seconds);
    147161  }
    148162  // slave node processing
    149163  else
    150164  {
    151     ///out << "Worker Starting" << endl;
     165    seconds = time(NULL);
     166    fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
     167
     168    // Check to see if GSDLHOME exists in the environment (it will on multicore
     169    // computer, but won't on compute nodes in a cluster). It will be NULL if
     170    // source setup.bash hasn't been run (on this computer).
     171    const char * gsdlhometest = getenv("GSDLHOME");
     172
    152173    char incoming[BUFFERSIZE];
    153 
    154174    int counter = 0;
    155 
    156175    do
    157176    {
     
    160179      if (resval != MPI_SUCCESS)
    161180      {
    162         printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
     181        fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
    163182        MPI_Abort(MPI_COMM_WORLD, rc);
    164183      }
     
    167186      {
    168187        // process a received job
    169         cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl;
     188        seconds = time(NULL);
     189        fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
    170190
    171191        // create DSpace filter-media command
    172192        // - incoming now contains the identifier of the item to filter
    173193        char command[2048];
    174 
    175         sprintf (command, "%s/bin/dspace filter-media -f -i \"%s\" > /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter);
    176 
    177         cerr << "**** cmd = " << command << endl;
    178 
     194        if (gsdlhometest != NULL)
     195        {
     196          sprintf (command, "%s/bin/dspace filter-media -f -i \"%s\" > /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter);
     197        }
     198        else
     199        {
     200          sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/dspace filter-media -f -i %s > /tmp/dspace_media_filter-P%d-C%d.log 2>&1\"", gsdlhomedir, dspacehomedir, incoming, rank, counter);
     201        }
     202        fprintf(stderr, "[W%d] system('%s')\n", rank, command);
    179203        // invoke dspace
    180204        system (command);
    181         cerr << "**** complete" << endl;
     205
     206        // send completed message
    182207        char line = ' ';
    183         // send completed message
    184208        MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
     209        seconds = time(NULL);
     210        fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
    185211      }
    186212    }
    187213    while (strcmp (incoming, "end") != 0);
    188214    // stop when "end" instruction is received
    189     cerr << "Worker Exiting" << endl;
     215    seconds = time(NULL);
     216    fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
    190217  }
    191218
    192219  // clean up MPI environment
    193   cerr << "Finalizing..." << endl;
     220  if (rank == 0)
     221  {
     222    fprintf(stderr,"[M] Finalizing...\n");
     223  }
     224  else
     225  {
     226    fprintf(stderr,"[W%d] Finalizing...\n", rank);
     227  }
    194228  MPI_Finalize();
    195229}
Note: See TracChangeset for help on using the changeset viewer.