// Master-worker program to read in a list of files and invoke // dspace filter-media on each separately using OpenMPI // // John Thompson // 8 June 2012 #include "mpi.h" #include #include #include #include #include #include #include using namespace std; #define BUFFERSIZE 512 int main ( int argc, char *argv [] ) { int numtasks, rank, rc; // MPI variables unsigned long int seconds = 0; if (4 != argc ) { fprintf(stderr,"Usage: mpidspacemediafilter gsdlhome dspacehome filelist\n"); exit(-1); } char *gsdlhomedir = argv[1]; char *dspacehomedir = argv[2]; // location of import script char *filelist = argv[3]; // start MPI environment rc = MPI_Init(&argc,&argv); if (rc != MPI_SUCCESS) { fprintf(stderr, "Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, rc); } // We'll handle errors ourselves MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // get MPI variables: number of processors and processor number MPI_Status stat; MPI_Comm_size(MPI_COMM_WORLD, &numtasks); MPI_Comm_rank(MPI_COMM_WORLD, &rank); // Get processor name too - important when it could be anywhere in a cluster int name_length; char processor_name[MPI_MAX_PROCESSOR_NAME]; MPI_Get_processor_name(processor_name, &name_length); // master node processing if (rank == 0) { seconds = time(NULL); fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name); char incoming[BUFFERSIZE]; // buffer for acknowledgments char buffer[BUFFERSIZE]; // buffer to send tasks MPI_Request request[BUFFERSIZE]; // request monitor for all tasks MPI_Status status[BUFFERSIZE]; // status monitor for all tasks int actualTasks = 0; // number of processors running // open file listing filenames to process ifstream infile; infile.open (filelist); string line_str; // set initial status of all processors to idle fprintf(stderr, "[M] Initializing processor state\n"); for ( int j=0; j 0) { // search for idle processor int dest=0; int found = 0; fprintf(stderr, "[M] Searching for idle processor\n"); while ((dest<(numtasks-1)) && (found == 0)) { if (incoming[dest] == ' ') { found = 1; } else { dest++; } } // if no idle processor, wait for one to become idle if (found == 0) { fprintf(stderr, "[M] Waiting for processor to become idle\n"); MPI_Waitany (numtasks-1, request, &dest, status); } // No need for manifests - just send the identifier of the item // to filter to the worker fprintf(stderr, "[M] Writing dspace identifier as instruction to worker\n"); // Jiggerypokery to get around weird compiler error: cannot pass // objects of non-POD type ‘struct std::string’ through ‘...’; call // will abort at runtime char *line = new char [line_str.size() + 1]; strcpy(line, line_str.c_str()); sprintf(buffer, "%s", line); delete [] line; // mark processors as busy incoming[dest] = 'B'; // send out the job to the processor fprintf(stderr, "[M] Asking worker %d to start\n", (dest + 1)); MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); // wait for a done acknowledgement MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); fprintf(stderr, "[M] Worker %d replied that it has started\n", (dest + 1)); // update counter of actual tasks if (dest > actualTasks) { actualTasks = dest; fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks); } } } infile.close(); // wait until all outstanding tasks are completed fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n"); int dest; for ( int k=0; k /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter); } else { sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/dspace filter-media -f -i %s > /tmp/dspace_media_filter-P%d-C%d.log 2>&1\"", gsdlhomedir, dspacehomedir, incoming, rank, counter); } fprintf(stderr, "[W%d] system('%s')\n", rank, command); // invoke dspace system (command); // send completed message char line = ' '; MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD); seconds = time(NULL); fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds); } } while (strcmp (incoming, "end") != 0); // stop when "end" instruction is received seconds = time(NULL); fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds); } // clean up MPI environment if (rank == 0) { fprintf(stderr,"[M] Finalizing...\n"); } else { fprintf(stderr,"[W%d] Finalizing...\n", rank); } MPI_Finalize(); }