Changeset 25839

Show
Ignore:
Timestamp:
28.06.2012 09:17:21 (7 years ago)
Author:
jmt12
Message:

Adding more timing to threads

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp

    r24833 r25839  
    1818#include <stdio.h> 
    1919#include <stdlib.h> 
     20#include <time.h> 
    2021 
    2122#include <fstream> 
     
    3435{ 
    3536  int numtasks, rank, rc;            // MPI variables 
     37  unsigned long int seconds = 0; 
    3638 
    3739  if (5 != argc && argc != 6 ) 
    3840  { 
    39     cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl; 
     41    fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n"); 
    4042    exit(-1); 
    4143  } 
     
    4648  char *gsdlhomedir = argv[3];      // location of import script 
    4749  char *collection = argv[4];        // Greenstone collection 
    48   char *site = ""; 
     50  char *site = NULL; 
    4951  if (argc == 6) 
    5052  { 
     
    5658  if (rc != MPI_SUCCESS) 
    5759  { 
    58     printf ("Error starting MPI program. Terminating.\n"); 
     60    fprintf(stderr, "Error starting MPI program. Terminating.\n"); 
    5961    MPI_Abort(MPI_COMM_WORLD, rc); 
    6062  } 
     
    6870  MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
    6971 
     72  // Get processor name too - important when it could be anywhere in a cluster 
     73  int name_length; 
     74  char processor_name[MPI_MAX_PROCESSOR_NAME]; 
     75  MPI_Get_processor_name(processor_name, &name_length); 
     76 
    7077  // master node processing 
    7178  if (rank == 0) 
    7279  { 
    73     cerr << " * Master Starting" << endl; 
     80    seconds = time(NULL); 
     81    fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name); 
    7482    char incoming[BUFFERSIZE];          // buffer for acknowledgments 
    7583    char buffer[BUFFERSIZE];         // buffer to send tasks 
     
    8694 
    8795    // set initial status of all processors to idle 
    88     cerr << " - initializing processor state" << endl; 
     96    fprintf(stderr, "[M] Initializing processor state\n"); 
    8997    for ( int j=0; j<BUFFERSIZE; j++ ) 
    9098    { 
     
    94102    // scan through contents of file listing 
    95103    int manifest_file_count = 1; 
    96     cerr << " - processing contents of filelist.txt" << endl; 
     104    fprintf(stderr, "[M] Processing contents of filelist.txt\n"); 
    97105    while (!infile.eof ()) 
    98106    { 
    99107      // get a filename 
    100108      getline (infile, line); 
     109 
     110      if ( line.length() == 0 && !infile.eof() ) 
     111      { 
     112        fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist); 
     113      } 
     114 
    101115      if (line.length() > 0) 
    102116      { 
     
    104118        int dest=0; 
    105119        int found = 0; 
    106         cerr << " - searching for idle processor" << endl; 
     120        fprintf(stderr, "[M] Searching for idle processor\n"); 
    107121        while ((dest<(numtasks-1)) && (found == 0)) 
    108122        { 
     
    120134        if (found == 0) 
    121135        { 
    122           cerr << " - waiting for processor to become idle" << endl; 
     136          fprintf(stderr, "[M] Waiting for processor to become idle\n"); 
    123137          MPI_Waitany (numtasks-1, request, &dest, status); 
    124138        } 
    125139 
    126140        // construct manifest filename 
    127         cerr << " - creating manifest file: number " << manifest_file_count << endl; 
     141        fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count); 
    128142    stringstream manifestfilename_strstr; 
    129143    manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml"; 
     
    138152          if (manifestfile.fail()) 
    139153          { 
    140             cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl; 
     154            fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename); 
    141155          } 
    142156        } 
    143157        else 
    144158        { 
    145           cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl; 
     159          fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename); 
    146160        } 
    147161        manifestfile << "<Manifest><Index>" << endl; 
     
    165179        manifestfile.close (); 
    166180 
    167         cerr << " - manifest file complete" << endl; 
     181        fprintf(stderr, "[M0] Manifest file complete\n"); 
    168182 
    169183        // Store manifest filepath so we can remove it later 
     
    171185 
    172186        // Send the manifest filename as the instruction 
    173         cerr << " - writing manifest filename as instruction to worker" << endl; 
     187        fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n"); 
    174188        sprintf(buffer, "%s", manifestfilename); 
    175189 
     
    177191        incoming[dest] = 'B'; 
    178192        // send out the job to the processor 
    179         cerr << " - asking worker to start" << endl; 
     193        fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1)); 
    180194        MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); 
    181195        // wait for a done acknowledgement 
    182196        MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); 
    183         cerr << " - worker replied that it has started" << endl; 
     197        fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1)); 
    184198        // update counter of actual tasks 
    185199        if (dest > actualTasks) 
    186200        { 
    187201          actualTasks = dest; 
    188           cerr << " - increased the number of running workers to: " << actualTasks << endl; 
     202          fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks); 
    189203        } 
    190204    // increase number of manifest files processed 
     
    196210 
    197211    // wait until all outstanding tasks are completed 
    198     cerr << " - waiting for all outstanding tasks to complete" << endl; 
     212    fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n"); 
    199213    int dest; 
    200214    for ( int k=0; k<actualTasks; k++ ) 
     
    204218 
    205219    // send message to end all processing engines 
    206     cerr << " * Master asking children to exit" << endl; 
     220    fprintf(stderr, "[M0] Master asking children to exit\n"); 
    207221    char endstr[5] = "end"; 
    208222    for ( int i=1; i<numtasks; i++ ) 
     
    212226 
    213227    // Free up manifest files 
    214     cerr << " - freeing shared memory used by manifest files" << endl; 
     228    fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n"); 
    215229    for ( int i = 0; i < manifest_files.size(); i++) 
    216230    { 
    217231      free(manifest_files[i]); 
    218232    } 
    219     cerr << " * Master Exiting" << endl; 
    220   } 
    221   // slave node processing 
     233    seconds = time(NULL); 
     234    fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds); 
     235  } 
     236  // worker node processing 
    222237  else 
    223238  { 
    224     ///out << "Worker Starting" << endl; 
     239    seconds = time(NULL); 
     240    fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name); 
     241 
     242    // Check to see if GSDLHOME exists in the environment (it will on multicore 
     243    // computer, but won't on compute nodes in a cluster). It will be NULL if 
     244    // source setup.bash hasn't been run (on this computer). 
     245    const char * gsdlhometest = getenv("GSDLHOME"); 
     246 
    225247    char incoming[BUFFERSIZE]; 
    226  
    227248    int counter = 0; 
    228  
    229249    do 
    230250    { 
     
    233253      if (resval != MPI_SUCCESS) 
    234254      { 
    235         printf ("Error when recieving message from master [%d]... Terminating.\n", resval); 
     255        fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval); 
    236256        MPI_Abort(MPI_COMM_WORLD, rc); 
    237257      } 
     
    240260      { 
    241261        // process a received job 
    242         cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl; 
    243  
     262        seconds = time(NULL); 
     263        fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter); 
    244264        // create Greenstone import command 
    245265        // - incoming now contains the name of the manifest file to process 
    246266        char command[2048]; 
    247  
    248267#ifdef HARDAFFINITY 
    249268        int cpu = rank - 1; 
    250         cerr << "Setting affinity for worker " << rank << " to cpu " << cpu << endl; 
    251         if (site != "") 
    252         { 
    253           sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", cpu, gsdlhomedir, incoming, site, collection); 
     269        fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu); 
     270        char affinity[16]; 
     271        sprintf(affinity, "taskset -c %d", cpu); 
     272#else 
     273        char affinity[16] = ""; 
     274#endif 
     275        if (site != NULL && strlen(site) > 0) 
     276        { 
     277          if (gsdlhometest != NULL) 
     278          { 
     279            sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter); 
     280          } 
     281          else 
     282          { 
     283            sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", gsdlhomedir, affinity, incoming, site, collection, rank, counter); 
     284          } 
     285        } 
     286        else if (gsdlhometest != NULL) 
     287        { 
     288          sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter); 
    254289        } 
    255290        else 
    256291        { 
    257           sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", cpu, gsdlhomedir, incoming, collection); 
    258         } 
    259 #else 
    260         if (site != "") 
    261         { 
    262           sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection); 
    263         } 
    264         else 
    265         { 
    266           //sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", gsdlhomedir, incoming, collection); 
    267           sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s > %s/collect/%s/logs/import-p%d-%d.log 2>&1", gsdlhomedir, incoming, collection, gsdlhomedir, collection, rank, counter); 
    268         } 
    269 #endif 
    270  
    271         cerr << "**** cmd = " << command << endl; 
     292          sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1\"", affinity, gsdlhomedir, incoming, collection, rank, counter); 
     293        } 
     294 
     295        fprintf(stderr, "[W%d] system('%s')\n", rank, command); 
    272296 
    273297        // invoke Greenstone import with manifest file 
    274298        system (command); 
    275         cerr << "**** complete" << endl; 
     299        seconds = time(NULL); 
     300        fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds); 
    276301        char line = ' '; 
    277302        // send completed message 
     
    281306    while (strcmp (incoming, "end") != 0); 
    282307    // stop when "end" instruction is received 
    283     cerr << "Worker Exiting" << endl; 
    284   } 
    285  
    286   // clean up MPI environment 
    287   cerr << "Finalizing..." << endl; 
     308    seconds = time(NULL); 
     309    fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds); 
     310  } 
     311  ///err << "Finalizing..." << endl; 
    288312  MPI_Finalize(); 
    289313}