// Master-worker program to read in a list of files and invoke // import on each separately using manifest files in Greenstone 3, // with synchronisation using OpenMPI // // Hussein Suleman // 1 July 2010 // 2011MAR - added a bunch of commands to aid [my] understanding. jmt12 // - made site argument optional to support GS2. jmt12 // - moving manifest writing code into the rank=0 thread. This will // remove the artificial limit on epoc size caused by size of // message buffer between controller and child threads. jmt12 #include "mpi.h" #include #include #include #include #include #include using namespace std; #define BUFFERSIZE 512 int main ( int argc, char *argv [] ) { int numtasks, rank, rc; // MPI variables if (5 != argc && argc != 6 ) { cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl; exit(-1); } char *filelist = argv[1]; // list of filenames char *epochStr = argv[2]; // number of files per task int epoch = atoi (epochStr); char *gsdlhomedir = argv[3]; // location of import script char *collection = argv[4]; // Greenstone collection char *site = ""; if (argc == 6) { site = argv[5]; // Greenstone site } // start MPI environment rc = MPI_Init(&argc,&argv); if (rc != MPI_SUCCESS) { printf ("Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, rc); } // We'll handle errors ourselves MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // get MPI variables: number of processors and processor number MPI_Status stat; MPI_Comm_size(MPI_COMM_WORLD, &numtasks); MPI_Comm_rank(MPI_COMM_WORLD, &rank); // master node processing if (rank == 0) { char incoming[BUFFERSIZE]; // buffer for acknowledgments char buffer[BUFFERSIZE]; // buffer to send tasks MPI_Request request[BUFFERSIZE]; // request monitor for all tasks MPI_Status status[BUFFERSIZE]; // status monitor for all tasks int actualTasks = 0; // number of processors running vector manifest_files; // Keep track of temp manifest filenames // open file listing filenames to process ifstream infile; infile.open (filelist); string line; // set initial status of all processors to idle for ( int j=0; j 0) { // search for idle processor int dest=0; int found = 0; while ((dest<(numtasks-1)) && (found == 0)) { if (incoming[dest] == ' ') { found = 1; } else { dest++; } } // if no idle processor, wait for one to become idle if (found == 0) { MPI_Waitany (numtasks-1, request, &dest, status); } // construct manifest filename char manifestfile_buffer[128]; sprintf (manifestfile_buffer, "%u.manifest.xml", rank); char* manifestfilename = tempnam(NULL,manifestfile_buffer); ofstream manifestfile; if (manifestfilename != NULL) { // create manifest file manifestfile.open (manifestfilename); if (manifestfile.fail()) { cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl; } } else { cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl; } manifestfile << "" << endl; // add the first filename to the instruction manifestfile << "" << line << "" << endl; int epochCounter = epoch; // if epoch>1 and more filenames, add more filenames while ((epochCounter > 1) && (!infile.eof ())) { getline (infile, line); if (line.length () > 0) { manifestfile << "" << line << "" << endl; epochCounter--; } } manifestfile << "" << endl; manifestfile.close (); // Store manifest filepath so we can remove it later manifest_files.push_back(manifestfilename); // Send the manifest filename as the instruction sprintf(buffer, "%s", manifestfilename); // mark processors as busy incoming[dest] = 'B'; // send out the job to the processor MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); // wait for a done acknowledgement MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); // update counter of actual tasks if (dest > actualTasks) { actualTasks = dest; } } } infile.close(); // wait until all outstanding tasks are completed int dest; for ( int k=0; k