// Master-worker program to read in a list of files and invoke // dspace filter-media on each separately using OpenMPI // // John Thompson // 8 June 2012 #include "mpi.h" #include #include #include #include #include #include #include using namespace std; #define BUFFERSIZE 512 int main ( int argc, char *argv [] ) { int numtasks, rank, rc; // MPI variables if (3 != argc ) { cerr << "Usage: " << argv[0] << " dspacehome filelist" << endl; exit(-1); } char *dspacehomedir = argv[1]; // location of import script char *filelist = argv[2]; // start MPI environment rc = MPI_Init(&argc,&argv); if (rc != MPI_SUCCESS) { printf ("Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, rc); } // We'll handle errors ourselves MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // get MPI variables: number of processors and processor number MPI_Status stat; MPI_Comm_size(MPI_COMM_WORLD, &numtasks); MPI_Comm_rank(MPI_COMM_WORLD, &rank); // master node processing if (rank == 0) { cerr << " * Master Starting" << endl; char incoming[BUFFERSIZE]; // buffer for acknowledgments char buffer[BUFFERSIZE]; // buffer to send tasks MPI_Request request[BUFFERSIZE]; // request monitor for all tasks MPI_Status status[BUFFERSIZE]; // status monitor for all tasks int actualTasks = 0; // number of processors running // open file listing filenames to process ifstream infile; infile.open (filelist); string line_str; // set initial status of all processors to idle cerr << " - initializing processor state" << endl; for ( int j=0; j 0) { // search for idle processor int dest=0; int found = 0; cerr << " - searching for idle processor" << endl; while ((dest<(numtasks-1)) && (found == 0)) { if (incoming[dest] == ' ') { found = 1; } else { dest++; } } // if no idle processor, wait for one to become idle if (found == 0) { cerr << " - waiting for processor to become idle" << endl; MPI_Waitany (numtasks-1, request, &dest, status); } // No need for manifests - just send the identifier of the item // to filter to the worker cerr << " - writing dspace identifier as instruction to worker" << endl; char *line = new char [line_str.size() + 1]; strcpy(line, line_str.c_str()); sprintf(buffer, "%s", line); // mark processors as busy incoming[dest] = 'B'; // send out the job to the processor cerr << " - asking worker to start" << endl; MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); // wait for a done acknowledgement MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); cerr << " - worker replied that it has started" << endl; // update counter of actual tasks if (dest > actualTasks) { actualTasks = dest; cerr << " - increased the number of running workers to: " << actualTasks << endl; } } } infile.close(); // wait until all outstanding tasks are completed cerr << " - waiting for all outstanding tasks to complete" << endl; int dest; for ( int k=0; k /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter); cerr << "**** cmd = " << command << endl; // invoke dspace system (command); cerr << "**** complete" << endl; char line = ' '; // send completed message MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD); } } while (strcmp (incoming, "end") != 0); // stop when "end" instruction is received cerr << "Worker Exiting" << endl; } // clean up MPI environment cerr << "Finalizing..." << endl; MPI_Finalize(); }