Changeset 25839


Ignore:
Timestamp:
2012-06-28T09:17:21+12:00 (12 years ago)
Author:
jmt12
Message:

Adding more timing to threads

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp

    r24833 r25839  
    1818#include <stdio.h>
    1919#include <stdlib.h>
     20#include <time.h>
    2021
    2122#include <fstream>
     
    3435{
    3536  int numtasks, rank, rc;            // MPI variables
     37  unsigned long int seconds = 0;
    3638
    3739  if (5 != argc && argc != 6 )
    3840  {
    39     cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl;
     41    fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n");
    4042    exit(-1);
    4143  }
     
    4648  char *gsdlhomedir = argv[3];      // location of import script
    4749  char *collection = argv[4];        // Greenstone collection
    48   char *site = "";
     50  char *site = NULL;
    4951  if (argc == 6)
    5052  {
     
    5658  if (rc != MPI_SUCCESS)
    5759  {
    58     printf ("Error starting MPI program. Terminating.\n");
     60    fprintf(stderr, "Error starting MPI program. Terminating.\n");
    5961    MPI_Abort(MPI_COMM_WORLD, rc);
    6062  }
     
    6870  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    6971
     72  // Get processor name too - important when it could be anywhere in a cluster
     73  int name_length;
     74  char processor_name[MPI_MAX_PROCESSOR_NAME];
     75  MPI_Get_processor_name(processor_name, &name_length);
     76
    7077  // master node processing
    7178  if (rank == 0)
    7279  {
    73     cerr << " * Master Starting" << endl;
     80    seconds = time(NULL);
     81    fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
    7482    char incoming[BUFFERSIZE];          // buffer for acknowledgments
    7583    char buffer[BUFFERSIZE];         // buffer to send tasks
     
    8694
    8795    // set initial status of all processors to idle
    88     cerr << " - initializing processor state" << endl;
     96    fprintf(stderr, "[M] Initializing processor state\n");
    8997    for ( int j=0; j<BUFFERSIZE; j++ )
    9098    {
     
    94102    // scan through contents of file listing
    95103    int manifest_file_count = 1;
    96     cerr << " - processing contents of filelist.txt" << endl;
     104    fprintf(stderr, "[M] Processing contents of filelist.txt\n");
    97105    while (!infile.eof ())
    98106    {
    99107      // get a filename
    100108      getline (infile, line);
     109
     110      if ( line.length() == 0 && !infile.eof() )
     111      {
     112        fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist);
     113      }
     114
    101115      if (line.length() > 0)
    102116      {
     
    104118        int dest=0;
    105119        int found = 0;
    106         cerr << " - searching for idle processor" << endl;
     120        fprintf(stderr, "[M] Searching for idle processor\n");
    107121        while ((dest<(numtasks-1)) && (found == 0))
    108122        {
     
    120134        if (found == 0)
    121135        {
    122           cerr << " - waiting for processor to become idle" << endl;
     136          fprintf(stderr, "[M] Waiting for processor to become idle\n");
    123137          MPI_Waitany (numtasks-1, request, &dest, status);
    124138        }
    125139
    126140        // construct manifest filename
    127         cerr << " - creating manifest file: number " << manifest_file_count << endl;
     141        fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count);
    128142    stringstream manifestfilename_strstr;
    129143    manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
     
    138152          if (manifestfile.fail())
    139153          {
    140             cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
     154            fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
    141155          }
    142156        }
    143157        else
    144158        {
    145           cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
     159          fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
    146160        }
    147161        manifestfile << "<Manifest><Index>" << endl;
     
    165179        manifestfile.close ();
    166180
    167         cerr << " - manifest file complete" << endl;
     181        fprintf(stderr, "[M0] Manifest file complete\n");
    168182
    169183        // Store manifest filepath so we can remove it later
     
    171185
    172186        // Send the manifest filename as the instruction
    173         cerr << " - writing manifest filename as instruction to worker" << endl;
     187        fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n");
    174188        sprintf(buffer, "%s", manifestfilename);
    175189
     
    177191        incoming[dest] = 'B';
    178192        // send out the job to the processor
    179         cerr << " - asking worker to start" << endl;
     193        fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1));
    180194        MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
    181195        // wait for a done acknowledgement
    182196        MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
    183         cerr << " - worker replied that it has started" << endl;
     197        fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1));
    184198        // update counter of actual tasks
    185199        if (dest > actualTasks)
    186200        {
    187201          actualTasks = dest;
    188           cerr << " - increased the number of running workers to: " << actualTasks << endl;
     202          fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks);
    189203        }
    190204    // increase number of manifest files processed
     
    196210
    197211    // wait until all outstanding tasks are completed
    198     cerr << " - waiting for all outstanding tasks to complete" << endl;
     212    fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n");
    199213    int dest;
    200214    for ( int k=0; k<actualTasks; k++ )
     
    204218
    205219    // send message to end all processing engines
    206     cerr << " * Master asking children to exit" << endl;
     220    fprintf(stderr, "[M0] Master asking children to exit\n");
    207221    char endstr[5] = "end";
    208222    for ( int i=1; i<numtasks; i++ )
     
    212226
    213227    // Free up manifest files
    214     cerr << " - freeing shared memory used by manifest files" << endl;
     228    fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n");
    215229    for ( int i = 0; i < manifest_files.size(); i++)
    216230    {
    217231      free(manifest_files[i]);
    218232    }
    219     cerr << " * Master Exiting" << endl;
    220   }
    221   // slave node processing
     233    seconds = time(NULL);
     234    fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds);
     235  }
     236  // worker node processing
    222237  else
    223238  {
    224     ///out << "Worker Starting" << endl;
     239    seconds = time(NULL);
     240    fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
     241
     242    // Check to see if GSDLHOME exists in the environment (it will on multicore
     243    // computer, but won't on compute nodes in a cluster). It will be NULL if
     244    // source setup.bash hasn't been run (on this computer).
     245    const char * gsdlhometest = getenv("GSDLHOME");
     246
    225247    char incoming[BUFFERSIZE];
    226 
    227248    int counter = 0;
    228 
    229249    do
    230250    {
     
    233253      if (resval != MPI_SUCCESS)
    234254      {
    235         printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
     255        fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
    236256        MPI_Abort(MPI_COMM_WORLD, rc);
    237257      }
     
    240260      {
    241261        // process a received job
    242         cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl;
    243 
     262        seconds = time(NULL);
     263        fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
    244264        // create Greenstone import command
    245265        // - incoming now contains the name of the manifest file to process
    246266        char command[2048];
    247 
    248267#ifdef HARDAFFINITY
    249268        int cpu = rank - 1;
    250         cerr << "Setting affinity for worker " << rank << " to cpu " << cpu << endl;
    251         if (site != "")
    252         {
    253           sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", cpu, gsdlhomedir, incoming, site, collection);
     269        fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu);
     270        char affinity[16];
     271        sprintf(affinity, "taskset -c %d", cpu);
     272#else
     273        char affinity[16] = "";
     274#endif
     275        if (site != NULL && strlen(site) > 0)
     276        {
     277          if (gsdlhometest != NULL)
     278          {
     279            sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter);
     280          }
     281          else
     282          {
     283            sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", gsdlhomedir, affinity, incoming, site, collection, rank, counter);
     284          }
     285        }
     286        else if (gsdlhometest != NULL)
     287        {
     288          sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter);
    254289        }
    255290        else
    256291        {
    257           sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", cpu, gsdlhomedir, incoming, collection);
    258         }
    259 #else
    260         if (site != "")
    261         {
    262           sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection);
    263         }
    264         else
    265         {
    266           //sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", gsdlhomedir, incoming, collection);
    267           sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s > %s/collect/%s/logs/import-p%d-%d.log 2>&1", gsdlhomedir, incoming, collection, gsdlhomedir, collection, rank, counter);
    268         }
    269 #endif
    270 
    271         cerr << "**** cmd = " << command << endl;
     292          sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1\"", affinity, gsdlhomedir, incoming, collection, rank, counter);
     293        }
     294
     295        fprintf(stderr, "[W%d] system('%s')\n", rank, command);
    272296
    273297        // invoke Greenstone import with manifest file
    274298        system (command);
    275         cerr << "**** complete" << endl;
     299        seconds = time(NULL);
     300        fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
    276301        char line = ' ';
    277302        // send completed message
     
    281306    while (strcmp (incoming, "end") != 0);
    282307    // stop when "end" instruction is received
    283     cerr << "Worker Exiting" << endl;
    284   }
    285 
    286   // clean up MPI environment
    287   cerr << "Finalizing..." << endl;
     308    seconds = time(NULL);
     309    fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
     310  }
     311  ///err << "Finalizing..." << endl;
    288312  MPI_Finalize();
    289313}
Note: See TracChangeset for help on using the changeset viewer.