// Uses OpenMPI to allow several Greenstone build passes to be run in parallel. // Author: John Thompson, 2010NOV18 // Adapted from code by: Hussein Suleman, 2010JUL01 // 0. Initialization // Uncomment to display debug information #define DEBUG 1 // Comment out to disable XML reading #define TINYXML 1 // Fixed buffer size - needs to be fixed size to pass between processes #define BUFFERSIZE 256 // Special message tags #define EXITTAG 0 #define SUCCESSTAG 1 #include #include #include #include #include #include #include #include #include #include #ifdef TINYXML #include "tinyxml.h" #endif using namespace std; struct task_t { int id; int prerequisite; /* -1 for no prerequisite */ string command; }; // Lets call an order group of tasks a recipe, shall we? typedef vector recipe_t; /* Function Prototypes */ static void debugPrint(int,string); static void masterProcess(const char*); static void parseRecipe(const char*, recipe_t*); static void workerProcess(int); #ifdef TINYXML static int recurseRecipeXML(recipe_t*, TiXmlNode*, int, int); #endif /** */ int main(int argc, char **argv) { if (argc != 2) { cerr << "usage: mpibuild.cpp " << endl; return 0; } // 1. Initialize MPI MPI_Init(&argc, &argv); // 2. Find out my identity in the 'world' int myrank; MPI_Comm_rank(MPI_COMM_WORLD, &myrank); if (myrank == 0) { debugPrint(-1,"===== MPIBuild ====="); } // - and dispatch to the appropriate chunk of code if (myrank == 0) { masterProcess(argv[1]); } else { workerProcess(myrank); } // Shut down MPI MPI_Finalize(); if (myrank == 0) { debugPrint(-1,"===== Complete! ====="); } return 0; } /** main(int argc, char **argv) **/ /** */ static void debugPrint(int source, string message) { #ifdef DEBUG time_t seconds = time (NULL); cout << "[" << seconds << "]"; if (source == 0) { cout << "[Master] "; } else if (source > 0) { cout << "[Worker" << source << "] "; } cout << message << endl; #endif } /** debugPrint(int,string) **/ /** The activity undertaken by the master 'thread' */ static void masterProcess(const char* recipe_xml_path) { debugPrint(0, "starting"); // 0. Initialize debugPrint(0, "initializing"); // - find out how many processes there are in the default communicator int number_of_processes; MPI_Comm_size(MPI_COMM_WORLD, &number_of_processes); // - sanity check if (number_of_processes < 2) { cerr << "Error! Minimum number of processes is 2" << endl; } // - remember that '0' is the master processor int number_of_workers = number_of_processes - 1; // - initialize the pool of idle worker processes... stack idle_workers; for (int i = 1; i <= number_of_workers; i++) { idle_workers.push(i); } // - we also need a queue of tasks ready to be undertaken. We use a queue as // the recipe should be if preferred order of execution - and so might be // slightly more efficient if run in that order queue ready_tasks; // 1. Parse in the 'recipe' that controls what configurations of Greenstone // build can be called and when. We end up with a 'queue' of tasks, each // with a unique id (int), a prequisite (int) if required, and a shell // command (string). debugPrint(0, "parsing recipe"); recipe_t recipe; parseRecipe(recipe_xml_path, &recipe); // - go ahead and define some iterators recipe_t::iterator recipe_begin_itr = recipe.begin(); recipe_t::iterator recipe_end_itr = recipe.end(); recipe_t::iterator recipe_current_itr; // - now look through recipe for tasks that are ready to go immediate (i.e. // have no prerequisite tasks) and add them to the queue of ready tasks debugPrint(0, "queueing ready tasks"); recipe_current_itr = recipe_begin_itr; while (recipe_current_itr != recipe_end_itr) { task_t a_task = *recipe_current_itr; if (a_task.prerequisite == -1) { ready_tasks.push(a_task); } ++recipe_current_itr; } // 2. We have a 'pool' of idle workers and a list of ready tasks. Start by // iterating through the tasks assigning them to workers until we have // either run out of actionable tasks (although some tasks may become // actionable once others finish) or have exhausted the pool of workers. debugPrint(0, "initial task assignment"); while (ready_tasks.size() > 0 && idle_workers.size() > 0) { task_t a_task = ready_tasks.front(); ready_tasks.pop(); // - grab an idle worker int worker_id = idle_workers.top(); idle_workers.pop(); // - create a fixed size buffer to store the command string in char buffer[BUFFERSIZE]; sprintf(buffer, "%s", a_task.command.c_str()); // - seed the slaves; send one unit of work to each slave. stringstream strstr; strstr << " - assigning task " << a_task.id << " to worker " << worker_id; debugPrint(0, strstr.str()); MPI_Send(&buffer, // message buffer containing command strlen(buffer)+1, // command string length MPI_CHAR, // data item is a character array worker_id, // destination process rank a_task.id, // we use the task id as the worktag! MPI_COMM_WORLD); // default communicator } // - by now we have either a) assigned all the ready jobs to workers // or, b) we've run out of idle workers. stringstream str1; str1 << ready_tasks.size(); debugPrint(0, str1.str() + " ready tasks remaining"); stringstream str2; str2 << idle_workers.size(); debugPrint(0, str2.str() + " idle workers remaining"); // 3. Assuming we have at least one worker that is busy (so the not all the // workers are sitting in the idle pool), we wait/block until we receive // feedback from a worker process (feedback that includes the identifier // of the task just completed). We go through the recipe and queue any // dependant tasks as ready to go, while returning the worker to the idle // pool. We then essentially repeat the process of matching task to // working trying to exhaust either the queue of ready tasks or of will // slaves. (note: maybe optimize this later) while (idle_workers.size() < number_of_workers) { // - wait until a worker completes and replies debugPrint(0, "waiting until some worker process responds"); int task_id; MPI_Status status; MPI_Recv(&task_id, // the identifier of the task completed 1, // a single interger expected MPI_INT, // data item is an int MPI_ANY_SOURCE, // receive from any sender MPI_ANY_TAG, // any type of message tag MPI_COMM_WORLD, // default communicator &status); // info about the received message // - loop through the tasks, looking for any that were waiting for // this task to be completed, and add them to the ready queue. debugPrint(0, "queuing any tasks that are now ready"); recipe_current_itr = recipe_begin_itr; while (recipe_current_itr < recipe_end_itr) { task_t a_task = *recipe_current_itr; if (a_task.prerequisite == task_id) { ready_tasks.push(a_task); } ++recipe_current_itr; } // - status contains the identifier of the worker process... int worker_id = status.MPI_SOURCE; // - ...which we add to the pool of idle workers idle_workers.push(worker_id); // - now we try, once again, to match tasks to workers until we run out of // one or the other debugPrint(0, "task assignment"); while (ready_tasks.size() > 0 && idle_workers.size() > 0) { task_t a_task = ready_tasks.front(); ready_tasks.pop(); // - grab an idle worker int worker_id = idle_workers.top(); idle_workers.pop(); // - create a fixed size buffer to store the command string in char buffer[BUFFERSIZE]; if (sprintf(buffer, "%s", a_task.command.c_str()) < 0) { cerr << "Error! Failed to write command string into transport buffer." << endl; } // - send the task to the worker stringstream strstr; strstr << " - assigning task " << a_task.id << " to worker " << worker_id; debugPrint(0, strstr.str()); MPI_Send(&buffer, // message buffer containing command strlen(buffer)+1, // command string length MPI_CHAR, // data item is a character array worker_id, // destination process rank a_task.id, // we use the task id as the worktag! MPI_COMM_WORLD); // default communicator } stringstream str3; str3 << ready_tasks.size(); debugPrint(0, str3.str() + " ready tasks remaining"); stringstream str4; str4 << idle_workers.size(); debugPrint(0, str4.str() + " idle workers remaining"); } // - we can do some sanity checking here. For instance, there should be no // tasks left in the ready queue at this point and all workers should be // sitting in the idle pool if (ready_tasks.size() > 0) { cerr << "Error! Processing supposedly complete but tasks are still pending!" << endl; } if (idle_workers.size() != number_of_workers) { cerr << "Error! Processing supposedly complete but workers are still busy!" << endl; } // 4. By now all workers have returned, and should we waiting idle in the // pool. Iterate over all workers telling them to exit by using the // special EXITTAG. for (int worker_id = 1; worker_id <= number_of_workers; worker_id++) { MPI_Send(0, // we don't intend to do any processing 0, // zero data items MPI_INT, // data item is an integer worker_id, // destination process identifier EXITTAG, // message flag indicating workers must exit MPI_COMM_WORLD); // default communicator } // 5. At this point all the slaves have expired and the master process is // complete debugPrint(0, "exiting"); } /** masterProcess(const char *) **/ /** Reads in an XML file, and builds an (ordered) list of tasks which I've * termed a recipe. * - note: when parsing, code should verify that no command exceeds BUFFERSIZE * characters in length */ static void parseRecipe(const char* recipe_xml_path, recipe_t* recipe) { #ifdef TINYXML TiXmlDocument recipe_xml_doc(recipe_xml_path); bool is_loaded = recipe_xml_doc.LoadFile(); if (is_loaded) { // - top node is always the document (in this case ) so we loop // through it's child elements (which should be the no-prequisite tasks) TiXmlElement *root = recipe_xml_doc.RootElement(); int task_count = 0; TiXmlNode* element; for ( element = root->FirstChild(); element != 0; element = element->NextSibling()) { task_count++; task_count = recurseRecipeXML(recipe, element, -1, task_count); } } else { cerr << "Error! Failed to open/parse XML file: " << recipe_xml_path << endl; } #else // - dummy data for a start. We'll emulate a (complex) MG build with four // indexes task_t task1; task1.id = 1; task1.prerequisite = -1; task1.command = "buildcol.pl -keepold -verbosity 3 -mode compress_text tdb0005000 >> build.log 2>&1"; recipe->push_back(task1); task_t task2; task2.id = 2; task2.prerequisite = -1; task2.command = "buildcol.pl -keepold -verbosity 3 -mode infodb tdb0005000 >> build.log 2>&1"; recipe->push_back(task2); task_t task3; task3.id = 3; task3.prerequisite = 1; task3.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:text tdb0005000 >> build.log 2>&1"; recipe->push_back(task3); task_t task4; task4.id = 4; task4.prerequisite = 1; task4.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:Title tdb0005000 >> build.log 2>&1"; recipe->push_back(task4); task_t task5; task5.id = 5; task5.prerequisite = 1; task5.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:Source tdb0005000 >> build.log 2>&1"; recipe->push_back(task5); task_t task6; task6.id = 6; task6.prerequisite = 1; task6.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:Title,Source tdb0005000 >> build.log 2>&1"; recipe->push_back(task6); task_t task7; task7.id = 7; task7.prerequisite = 1; task7.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:text,Title,Source tdb0005000 >> build.log 2>&1"; recipe->push_back(task7); #endif #ifdef DEBUG debugPrint(0, " * The Recipe!"); // If we are debugging, lets print out the recipe recipe_t::iterator recipe_begin_itr = (*recipe).begin(); recipe_t::iterator recipe_end_itr = (*recipe).end(); recipe_t::iterator recipe_current_itr; for(recipe_current_itr = recipe_begin_itr; recipe_current_itr < recipe_end_itr; ++recipe_current_itr) { task_t a_task = (task_t)(*recipe_current_itr); stringstream recipestrstr; recipestrstr << " Task " << a_task.id << ": " << a_task.command << " [Prequisite: " << a_task.prerequisite << "]"; debugPrint(0, recipestrstr.str()); } #endif } /** parseRecipe(const char*, recipe_t*) **/ /** */ #ifdef TINYXML static int recurseRecipeXML(recipe_t* recipe, TiXmlNode* element, int parent_id, int task_count) { // A. Loop through this task elements children. At least one subelement // should be the command, while all other subelements should be // tasks that need to be further recursed. TiXmlNode* child; int task_id = task_count; bool found_command = false; // We can only have one command per task id for (child = element->FirstChild(); child != 0; child = child->NextSibling()) { // - we are only interested in element nodes if (child->Type() == TiXmlNode::TINYXML_ELEMENT) { stringstream element_name_stream; element_name_stream << child->Value(); string element_name = element_name_stream.str(); if (element_name == "Command" && !found_command) { // - retrieve the text child node for ( TiXmlNode *text_grandchild = child->FirstChild(); text_grandchild != 0; text_grandchild = text_grandchild->NextSibling()) { if (text_grandchild->Type() == TiXmlNode::TINYXML_TEXT) { TiXmlText* text_command = text_grandchild->ToText(); stringstream commandstrstr; commandstrstr << text_command->Value(); // - and create the new task given the command string and // accounting for any preqrequisites via the the parent task id // (begin non-zero) task_t a_task; a_task.id = task_id; a_task.prerequisite = parent_id; a_task.command = commandstrstr.str(); recipe->push_back(a_task); // - and prevent any further commands being associated with this // task id found_command = true; } } } else if (element_name == "Task") { task_count++; recurseRecipeXML(recipe, child, task_id, task_count); } } } return task_count; } #endif /** recurseRecipeXML(recipe_t*, TiXmlNode*, int, int) **/ /** Each worker is responsible for executing a particular command in a shell, * waiting until the command is complete, and then relaying to the master that * work is complete. * @param worker_id An integer containing the unique identifier (rank) of the * worker process. * @return void */ static void workerProcess(int worker_id) { debugPrint(worker_id, "starting"); // 0. Worker loops tirelessly until asked to exit (note that the mpi_recv // command below is blocking - so no tight-loop thrashing) while (1) { // 1. Receive a message from the master process char command[BUFFERSIZE]; MPI_Status status; MPI_Recv(&command, // buffer in which to store the command string BUFFERSIZE, // we expect (at most) BUFFERSIZE characters MPI_CHAR, // we expect a char array 0, // recieve from the master process only MPI_ANY_TAG, // accept any incoming tag MPI_COMM_WORLD, // default communicator &status); // - watch for the special exit tag, and end the worker process if detected if (status.MPI_TAG == EXITTAG) { debugPrint(worker_id, "exiting"); return; } // - otherwise the tag actually tells us the task id - which we need to // reply back with later int task_id = status.MPI_TAG; // - action the command in the buffer string message = "processing: "; message += command; debugPrint(worker_id, message); system(command); // asynchronous // 2. Send a reply back containing the id of the task just completed debugPrint(worker_id, "complete"); MPI_Send(&task_id, 1, MPI_INT, 0, SUCCESSTAG, MPI_COMM_WORLD); } } /** workerProcess(int) **/ /* #include "mpi.h" #include #include #include #include #include #include using namespace std; #define KILOBUF 512 #define MEGABUF 10240 int main( int argc, char *argv [] ) { // MPI variables int num_tasks; int rank; if (argc != 3 ) { cerr << "Usage: " << argv[0] << " gsdlhome collection" << endl; exit(-1); } // Greenstone home directory char *gsdl_home_dir = argv[1]; // Short collection name char *collection = argv[2]; // start MPI environment int mpi_status = MPI_Init(&argc, &argv); if (mpi_status != MPI_SUCCESS) { printf ("Error starting MPI program. Terminating.\n"); MPI_Abort(MPI_COMM_WORLD, mpi_status); } // get MPI variables: number of processors and processor number MPI_Status stat; MPI_Comm_size(MPI_COMM_WORLD, &num_tasks); MPI_Comm_rank(MPI_COMM_WORLD, &rank); // The Master node (dispatcher) has a rank of zero. All child processes have // higher ranks. if (rank == 0) { cout << "===== MPI Build =====" << endl; cout << "Uses OpenMPI to allow several Greenstone build passes to be run in parallel." << endl; cout << endl; cout << "Number of Processors: " << num_tasks << endl; cout << "Rank of current process: " << rank << endl; cout << endl; cout << "[0] Dispatcher" << endl; // buffer for acknowledgments // - Is there some reason this buffer isn't actually set to be numtasks in // size? char incoming[KILOBUF]; // Buffer to send tasks char buffer[KILOBUF]; // Request monitor for all tasks MPI_Request request[KILOBUF]; // Status monitor for all tasks MPI_Status status[KILOBUF]; // Number of processors running int actual_tasks = 0; // Set initial status of all processors to idle cout << " - initializing child tasks to idle" << endl; for ( int i = 0; i < KILOBUF; i++ ) { incoming[i] = ' '; } // In the future this would be where the process reads in settings from the // collect.cfg to determine how many passes are needed and thus what tasks // there are available for children to process. For the moment I'll just // hardcode the short three task list that essentially invokes each of the // three standard Greenstone build modes - compress_text, build_indexes and // make_infodb. cout << " - populating task list... "; // List of pending tasks to undertake vector tasks; tasks.push_back("infodb"); tasks.push_back("compress_text"); tasks.push_back("build_index"); cout << "found " << tasks.size() << " tasks" << endl; // For each pending task for(int j = 0; j < tasks.size(); j++) { // Search for idle processor cout << " - searching for idle processor" << endl; int dest=0; int found = 0; while ( (dest < ( num_tasks - 1 ) ) && ( found == 0 ) ) { if (incoming[dest] == ' ') { found = 1; } else { dest++; } } // If no idle processor, wait for one to become idle if (found == 0) { MPI_Waitany(num_tasks-1, request, &dest, status); } // Write the tasks mode flag as the instruction to the child sprintf(buffer, "%s", tasks[j].c_str()); // Mark processors as busy incoming[dest] = 'B'; cout << " - asking child process to execute: " << tasks[j] << endl; // Send out the job to the processor MPI_Send(&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); // I'm guess there is some kind of 'fork' either here or above. Somehow // this command 'waits' for a response from the child thread. In the // usual case the response is a single space (' ') that overwrites the // 'B' in the incoming string at the position that matches the child // thread number. (Presumably you could count the B's in the incoming // string to determine running threads... but we don't). MPI_Irecv(&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); cout << "[DEBUG] When does this code actually execute? [cmd: " << buffer << "]" << endl; // Update counter of actual tasks if (dest > actual_tasks) { actual_tasks = dest; } } // Wait until all outstanding tasks are completed cout << " - waiting for outstanding tasks" << endl; int dest; for ( int k = 0; k < actual_tasks; k++ ) { MPI_Waitany(actual_tasks, request, &dest, status); } // Send message to end all processing engines cout << " - ask all child processes to terminate" << endl; char endstr[5] = "end"; for ( int l = 1; l < num_tasks; l++ ) { MPI_Send(endstr, 4, MPI_CHAR, l, 1, MPI_COMM_WORLD); } } // Slave node processing else { cout << "[" << rank << "] Child Process" << endl; char incoming[KILOBUF]; do { // wait for instruction from master MPI_Recv(&incoming, KILOBUF, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat); if (strcmp(incoming, "end") != 0) { // Process a received job cout << " + processing command: " << incoming << endl; // Create Greenstone import command char command[2048]; sprintf(command, "%s/bin/script/buildcol.pl -keepold -verbosity 3 -mode %s %s", gsdl_home_dir, incoming, collection); cout << " + Greenstone buildcol command: " << command << endl; // Invoke Greenstone import with manifest file system (command); char line = ' '; // send completed message MPI_Send(&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD); cout << " + done [cmd: " << command << "]" << endl; } } // stop when "end" instruction is received while (strcmp(incoming, "end") != 0); cout << " + child process terminating" << endl; } // clean up MPI environment MPI_Finalize(); if (0 == rank) { cout << "Complete!" << endl << endl; } } */