// Master-worker program to read in a list of files and invoke // import on each separately using manifest files in Greenstone 3, // with synchronisation using OpenMPI // // Hussein Suleman // 1 July 2010 // 2011MAR - added a bunch of commands to aid (my) understanding. // - made site argument optional to support GS2. // - moving manifest writing code into the rank=0 thread. This will // remove the artificial limit on epoc size caused by size of // message buffer between controller and child threads. [jmt12] // 2011SEP - changing where the manifest files are written so I have the // ability to set up the location as a RAMDisk. [jmt12] #include "mpi.h" #include #include #include #include #include #include #include #include using namespace std; //#define HARDAFFINITY 1 #define BUFFERSIZE 512 int main ( int argc, char *argv [] ) { int numtasks, rank, rc; // MPI variables unsigned long int seconds = 0; if (5 != argc && argc != 6 ) { fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n"); exit(-1); } char *filelist = argv[1]; // list of filenames char *epochStr = argv[2]; // number of files per task int epoch = atoi (epochStr); char *gsdlhomedir = argv[3]; // location of import script char *collection = argv[4]; // Greenstone collection char *site = NULL; if (argc == 6) { site = argv[5]; // Greenstone site } // start MPI environment rc = MPI_Init(&argc,&argv); if (rc != MPI_SUCCESS) { fprintf(stderr, "Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, rc); } // We'll handle errors ourselves MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // get MPI variables: number of processors and processor number MPI_Status stat; MPI_Comm_size(MPI_COMM_WORLD, &numtasks); MPI_Comm_rank(MPI_COMM_WORLD, &rank); // Get processor name too - important when it could be anywhere in a cluster int name_length; char processor_name[MPI_MAX_PROCESSOR_NAME]; MPI_Get_processor_name(processor_name, &name_length); // master node processing if (rank == 0) { seconds = time(NULL); fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name); char incoming[BUFFERSIZE]; // buffer for acknowledgments char buffer[BUFFERSIZE]; // buffer to send tasks MPI_Request request[BUFFERSIZE]; // request monitor for all tasks MPI_Status status[BUFFERSIZE]; // status monitor for all tasks int actualTasks = 0; // number of processors running vector manifest_files; // Keep track of temp manifest filenames // open file listing filenames to process ifstream infile; infile.open (filelist); string line; // set initial status of all processors to idle fprintf(stderr, "[M] Initializing processor state\n"); for ( int j=0; j 0) { // search for idle processor int dest=0; int found = 0; fprintf(stderr, "[M] Searching for idle processor\n"); while ((dest<(numtasks-1)) && (found == 0)) { if (incoming[dest] == ' ') { found = 1; } else { dest++; } } // if no idle processor, wait for one to become idle if (found == 0) { fprintf(stderr, "[M] Waiting for processor to become idle\n"); MPI_Waitany (numtasks-1, request, &dest, status); } // construct manifest filename fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count); stringstream manifestfilename_strstr; manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml"; string manifestfilename_str = manifestfilename_strstr.str(); char *manifestfilename = new char [manifestfilename_str.size() + 1]; strcpy(manifestfilename, manifestfilename_str.c_str()); ofstream manifestfile; if (manifestfilename != NULL) { // create manifest file manifestfile.open (manifestfilename); if (manifestfile.fail()) { fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename); } } else { fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename); } manifestfile << "" << endl; // add the first filename to the instruction manifestfile << "" << line << "" << endl; int epochCounter = epoch; // if epoch>1 and more filenames, add more filenames while ((epochCounter > 1) && (!infile.eof ())) { getline (infile, line); if (line.length () > 0) { manifestfile << "" << line << "" << endl; epochCounter--; } } manifestfile << "" << endl; manifestfile.close (); fprintf(stderr, "[M0] Manifest file complete\n"); // Store manifest filepath so we can remove it later manifest_files.push_back(manifestfilename); // Send the manifest filename as the instruction fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n"); sprintf(buffer, "%s", manifestfilename); // mark processors as busy incoming[dest] = 'B'; // send out the job to the processor fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1)); MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); // wait for a done acknowledgement MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1)); // update counter of actual tasks if (dest > actualTasks) { actualTasks = dest; fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks); } // increase number of manifest files processed manifest_file_count++; } } infile.close(); // wait until all outstanding tasks are completed fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n"); int dest; for ( int k=0; k 0) { if (gsdlhometest != NULL) { sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter); } else { sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", gsdlhomedir, affinity, incoming, site, collection, rank, counter); } } else if (gsdlhometest != NULL) { sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter); } else { sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1\"", affinity, gsdlhomedir, incoming, collection, rank, counter); } fprintf(stderr, "[W%d] system('%s')\n", rank, command); // invoke Greenstone import with manifest file system (command); seconds = time(NULL); fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds); char line = ' '; // send completed message MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD); } } while (strcmp (incoming, "end") != 0); // stop when "end" instruction is received seconds = time(NULL); fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds); } ///err << "Finalizing..." << endl; MPI_Finalize(); }