// Master-worker program to prepare, parallel index, and then merge a terrier // based collection // // John Thompson // 3rd August 2012 #include "mpi.h" #include #include #include #include #include #include #include using namespace std; #define BUFFERSIZE 512 int main ( int argc, char *argv [] ) { int numtasks, rank, rc; // MPI variables unsigned long int seconds = 0; if (4 != argc ) { fprintf(stderr,"Usage: mpiterrierfileindexer \n"); exit(-1); } char *gsdlhome_dir = argv[1]; char *terrier_dir = argv[2]; int total_number_of_manifest_files = atoi(argv[3]); // start MPI environment rc = MPI_Init(&argc,&argv); if (rc != MPI_SUCCESS) { fprintf(stderr, "Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, rc); } // We'll handle errors ourselves MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // get MPI variables: number of processors and processor number MPI_Status stat; MPI_Comm_size(MPI_COMM_WORLD, &numtasks); MPI_Comm_rank(MPI_COMM_WORLD, &rank); // Get processor name too - important when it could be anywhere in a cluster int name_length; char processor_name[MPI_MAX_PROCESSOR_NAME]; MPI_Get_processor_name(processor_name, &name_length); // master node processing if (rank == 0) { seconds = time(NULL); fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name); char incoming[BUFFERSIZE]; // buffer for acknowledgments char buffer[BUFFERSIZE]; // buffer to send tasks MPI_Request request[BUFFERSIZE]; // request monitor for all tasks MPI_Status status[BUFFERSIZE]; // status monitor for all tasks int actualTasks = 0; // number of processors running int manifest_counter = 0; // set initial status of all processors to idle fprintf(stderr, "[M] Initializing processor state\n"); for ( int j=0; j actualTasks) { actualTasks = dest; fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks); } // onto the next manifest file manifest_counter++; } // wait until all outstanding tasks are completed fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n"); int dest; for ( int k=0; k /tmp/terrier-index-W%d-C%d.log 2>&1", terrier_dir, terrier_dir, incoming, incoming, rank, counter); } else { sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/anyclass.sh org.terrier.applications.FileIndexer -index -path %s/var/manifest-%s.spec -prefix %s > /tmp/terrier-index-W%d-C%d.log 2>&1\"", gsdlhome_dir, terrier_dir, terrier_dir, incoming, incoming, rank, counter); } fprintf(stderr, "[W%d] system('%s')\n", rank, command); // invoke dspace system(command); // send completed message char line = ' '; MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD); seconds = time(NULL); fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds); } } while (strcmp (incoming, "end") != 0); // stop when "end" instruction is received seconds = time(NULL); fprintf(stderr, "[W%d:%lu] Exiting\n", rank, seconds); } // clean up MPI environment if (rank == 0) { fprintf(stderr,"[M] Finalizing...\n"); } else { fprintf(stderr,"[W%d] Finalizing...\n", rank); } MPI_Finalize(); }