1 | // Uses OpenMPI to allow several Greenstone build passes to be run in parallel.
|
---|
2 | // Author: John Thompson, 2010NOV18
|
---|
3 | // Adapted from code by: Hussein Suleman, 2010JUL01
|
---|
4 |
|
---|
5 | // 0. Initialization
|
---|
6 | // Uncomment to display debug information
|
---|
7 | #define DEBUG 1
|
---|
8 | // Comment out to disable XML reading
|
---|
9 | #define TINYXML 1
|
---|
10 | // Fixed buffer size - needs to be fixed size to pass between processes
|
---|
11 | #define BUFFERSIZE 256
|
---|
12 | // Special message tags
|
---|
13 | #define EXITTAG 0
|
---|
14 | #define SUCCESSTAG 1
|
---|
15 |
|
---|
16 | #include <cstdio>
|
---|
17 | #include <cstdlib>
|
---|
18 | #include <cstring>
|
---|
19 | #include <iostream>
|
---|
20 | #include <queue>
|
---|
21 | #include <sstream>
|
---|
22 | #include <stack>
|
---|
23 | #include <string>
|
---|
24 | #include <vector>
|
---|
25 |
|
---|
26 | #include <mpi.h>
|
---|
27 |
|
---|
28 | #ifdef TINYXML
|
---|
29 | #include "tinyxml.h"
|
---|
30 | #endif
|
---|
31 |
|
---|
32 | using namespace std;
|
---|
33 |
|
---|
34 | struct task_t
|
---|
35 | {
|
---|
36 | int id;
|
---|
37 | int prerequisite; /* -1 for no prerequisite */
|
---|
38 | string command;
|
---|
39 | };
|
---|
40 | // Lets call an order group of tasks a recipe, shall we?
|
---|
41 | typedef vector<task_t> recipe_t;
|
---|
42 |
|
---|
43 | /* Function Prototypes */
|
---|
44 | static void debugPrint(int,string);
|
---|
45 | static void masterProcess(const char*);
|
---|
46 | static void parseRecipe(const char*, recipe_t*);
|
---|
47 | static void workerProcess(int);
|
---|
48 | #ifdef TINYXML
|
---|
49 | static int recurseRecipeXML(recipe_t*, TiXmlNode*, int, int);
|
---|
50 | #endif
|
---|
51 | /**
|
---|
52 | */
|
---|
53 | int
|
---|
54 | main(int argc, char **argv)
|
---|
55 | {
|
---|
56 | if (argc != 2)
|
---|
57 | {
|
---|
58 | cerr << "usage: mpibuild.cpp <recipe xml>" << endl;
|
---|
59 | return 0;
|
---|
60 | }
|
---|
61 | // 1. Initialize MPI
|
---|
62 | MPI_Init(&argc, &argv);
|
---|
63 | // 2. Find out my identity in the 'world'
|
---|
64 | int myrank;
|
---|
65 | MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
|
---|
66 | if (myrank == 0)
|
---|
67 | {
|
---|
68 | debugPrint(-1,"===== MPIBuild =====");
|
---|
69 | }
|
---|
70 | // - and dispatch to the appropriate chunk of code
|
---|
71 | if (myrank == 0)
|
---|
72 | {
|
---|
73 | masterProcess(argv[1]);
|
---|
74 | }
|
---|
75 | else
|
---|
76 | {
|
---|
77 | workerProcess(myrank);
|
---|
78 | }
|
---|
79 | // Shut down MPI
|
---|
80 | MPI_Finalize();
|
---|
81 | if (myrank == 0)
|
---|
82 | {
|
---|
83 | debugPrint(-1,"===== Complete! =====");
|
---|
84 | }
|
---|
85 | return 0;
|
---|
86 | }
|
---|
87 | /** main(int argc, char **argv) **/
|
---|
88 |
|
---|
89 | /**
|
---|
90 | */
|
---|
91 | static void
|
---|
92 | debugPrint(int source, string message)
|
---|
93 | {
|
---|
94 | #ifdef DEBUG
|
---|
95 | time_t seconds = time (NULL);
|
---|
96 | cout << "[" << seconds << "]";
|
---|
97 | if (source == 0)
|
---|
98 | {
|
---|
99 | cout << "[Master] ";
|
---|
100 | }
|
---|
101 | else if (source > 0)
|
---|
102 | {
|
---|
103 | cout << "[Worker" << source << "] ";
|
---|
104 | }
|
---|
105 | cout << message << endl;
|
---|
106 | #endif
|
---|
107 | }
|
---|
108 | /** debugPrint(int,string) **/
|
---|
109 |
|
---|
110 | /** The activity undertaken by the master 'thread'
|
---|
111 | */
|
---|
112 | static void
|
---|
113 | masterProcess(const char* recipe_xml_path)
|
---|
114 | {
|
---|
115 | debugPrint(0, "starting");
|
---|
116 | // 0. Initialize
|
---|
117 | debugPrint(0, "initializing");
|
---|
118 | // - find out how many processes there are in the default communicator
|
---|
119 | int number_of_processes;
|
---|
120 | MPI_Comm_size(MPI_COMM_WORLD, &number_of_processes);
|
---|
121 | // - sanity check
|
---|
122 | if (number_of_processes < 2)
|
---|
123 | {
|
---|
124 | cerr << "Error! Minimum number of processes is 2" << endl;
|
---|
125 | }
|
---|
126 | // - remember that '0' is the master processor
|
---|
127 | int number_of_workers = number_of_processes - 1;
|
---|
128 | // - initialize the pool of idle worker processes...
|
---|
129 | stack<int> idle_workers;
|
---|
130 | for (int i = 1; i <= number_of_workers; i++)
|
---|
131 | {
|
---|
132 | idle_workers.push(i);
|
---|
133 | }
|
---|
134 | // - we also need a queue of tasks ready to be undertaken. We use a queue as
|
---|
135 | // the recipe should be if preferred order of execution - and so might be
|
---|
136 | // slightly more efficient if run in that order
|
---|
137 | queue<task_t> ready_tasks;
|
---|
138 |
|
---|
139 | // 1. Parse in the 'recipe' that controls what configurations of Greenstone
|
---|
140 | // build can be called and when. We end up with a 'queue' of tasks, each
|
---|
141 | // with a unique id (int), a prequisite (int) if required, and a shell
|
---|
142 | // command (string).
|
---|
143 | debugPrint(0, "parsing recipe");
|
---|
144 | recipe_t recipe;
|
---|
145 | parseRecipe(recipe_xml_path, &recipe);
|
---|
146 | // - go ahead and define some iterators
|
---|
147 | recipe_t::iterator recipe_begin_itr = recipe.begin();
|
---|
148 | recipe_t::iterator recipe_end_itr = recipe.end();
|
---|
149 | recipe_t::iterator recipe_current_itr;
|
---|
150 |
|
---|
151 | // - now look through recipe for tasks that are ready to go immediate (i.e.
|
---|
152 | // have no prerequisite tasks) and add them to the queue of ready tasks
|
---|
153 | debugPrint(0, "queueing ready tasks");
|
---|
154 | recipe_current_itr = recipe_begin_itr;
|
---|
155 | while (recipe_current_itr != recipe_end_itr)
|
---|
156 | {
|
---|
157 | task_t a_task = *recipe_current_itr;
|
---|
158 | if (a_task.prerequisite == -1)
|
---|
159 | {
|
---|
160 | ready_tasks.push(a_task);
|
---|
161 | }
|
---|
162 | ++recipe_current_itr;
|
---|
163 | }
|
---|
164 |
|
---|
165 | // 2. We have a 'pool' of idle workers and a list of ready tasks. Start by
|
---|
166 | // iterating through the tasks assigning them to workers until we have
|
---|
167 | // either run out of actionable tasks (although some tasks may become
|
---|
168 | // actionable once others finish) or have exhausted the pool of workers.
|
---|
169 | debugPrint(0, "initial task assignment");
|
---|
170 | while (ready_tasks.size() > 0 && idle_workers.size() > 0)
|
---|
171 | {
|
---|
172 | task_t a_task = ready_tasks.front();
|
---|
173 | ready_tasks.pop();
|
---|
174 | // - grab an idle worker
|
---|
175 | int worker_id = idle_workers.top();
|
---|
176 | idle_workers.pop();
|
---|
177 | // - create a fixed size buffer to store the command string in
|
---|
178 | char buffer[BUFFERSIZE];
|
---|
179 | sprintf(buffer, "%s", a_task.command.c_str());
|
---|
180 | // - seed the slaves; send one unit of work to each slave.
|
---|
181 | stringstream strstr;
|
---|
182 | strstr << " - assigning task " << a_task.id << " to worker " << worker_id;
|
---|
183 | debugPrint(0, strstr.str());
|
---|
184 | MPI_Send(&buffer, // message buffer containing command
|
---|
185 | strlen(buffer)+1, // command string length
|
---|
186 | MPI_CHAR, // data item is a character array
|
---|
187 | worker_id, // destination process rank
|
---|
188 | a_task.id, // we use the task id as the worktag!
|
---|
189 | MPI_COMM_WORLD); // default communicator
|
---|
190 | }
|
---|
191 | // - by now we have either a) assigned all the ready jobs to workers
|
---|
192 | // or, b) we've run out of idle workers.
|
---|
193 | stringstream str1;
|
---|
194 | str1 << ready_tasks.size();
|
---|
195 | debugPrint(0, str1.str() + " ready tasks remaining");
|
---|
196 | stringstream str2;
|
---|
197 | str2 << idle_workers.size();
|
---|
198 | debugPrint(0, str2.str() + " idle workers remaining");
|
---|
199 |
|
---|
200 | // 3. Assuming we have at least one worker that is busy (so the not all the
|
---|
201 | // workers are sitting in the idle pool), we wait/block until we receive
|
---|
202 | // feedback from a worker process (feedback that includes the identifier
|
---|
203 | // of the task just completed). We go through the recipe and queue any
|
---|
204 | // dependant tasks as ready to go, while returning the worker to the idle
|
---|
205 | // pool. We then essentially repeat the process of matching task to
|
---|
206 | // working trying to exhaust either the queue of ready tasks or of will
|
---|
207 | // slaves. (note: maybe optimize this later)
|
---|
208 | while (idle_workers.size() < number_of_workers)
|
---|
209 | {
|
---|
210 | // - wait until a worker completes and replies
|
---|
211 | debugPrint(0, "waiting until some worker process responds");
|
---|
212 | int task_id;
|
---|
213 | MPI_Status status;
|
---|
214 | MPI_Recv(&task_id, // the identifier of the task completed
|
---|
215 | 1, // a single interger expected
|
---|
216 | MPI_INT, // data item is an int
|
---|
217 | MPI_ANY_SOURCE, // receive from any sender
|
---|
218 | MPI_ANY_TAG, // any type of message tag
|
---|
219 | MPI_COMM_WORLD, // default communicator
|
---|
220 | &status); // info about the received message
|
---|
221 | // - loop through the tasks, looking for any that were waiting for
|
---|
222 | // this task to be completed, and add them to the ready queue.
|
---|
223 | debugPrint(0, "queuing any tasks that are now ready");
|
---|
224 | recipe_current_itr = recipe_begin_itr;
|
---|
225 | while (recipe_current_itr < recipe_end_itr)
|
---|
226 | {
|
---|
227 | task_t a_task = *recipe_current_itr;
|
---|
228 | if (a_task.prerequisite == task_id)
|
---|
229 | {
|
---|
230 | ready_tasks.push(a_task);
|
---|
231 | }
|
---|
232 | ++recipe_current_itr;
|
---|
233 | }
|
---|
234 | // - status contains the identifier of the worker process...
|
---|
235 | int worker_id = status.MPI_SOURCE;
|
---|
236 | // - ...which we add to the pool of idle workers
|
---|
237 | idle_workers.push(worker_id);
|
---|
238 | // - now we try, once again, to match tasks to workers until we run out of
|
---|
239 | // one or the other
|
---|
240 | debugPrint(0, "task assignment");
|
---|
241 | while (ready_tasks.size() > 0 && idle_workers.size() > 0)
|
---|
242 | {
|
---|
243 | task_t a_task = ready_tasks.front();
|
---|
244 | ready_tasks.pop();
|
---|
245 | // - grab an idle worker
|
---|
246 | int worker_id = idle_workers.top();
|
---|
247 | idle_workers.pop();
|
---|
248 | // - create a fixed size buffer to store the command string in
|
---|
249 | char buffer[BUFFERSIZE];
|
---|
250 | if (sprintf(buffer, "%s", a_task.command.c_str()) < 0)
|
---|
251 | {
|
---|
252 | cerr << "Error! Failed to write command string into transport buffer." << endl;
|
---|
253 | }
|
---|
254 | // - send the task to the worker
|
---|
255 | stringstream strstr;
|
---|
256 | strstr << " - assigning task " << a_task.id << " to worker " << worker_id;
|
---|
257 | debugPrint(0, strstr.str());
|
---|
258 | MPI_Send(&buffer, // message buffer containing command
|
---|
259 | strlen(buffer)+1, // command string length
|
---|
260 | MPI_CHAR, // data item is a character array
|
---|
261 | worker_id, // destination process rank
|
---|
262 | a_task.id, // we use the task id as the worktag!
|
---|
263 | MPI_COMM_WORLD); // default communicator
|
---|
264 | }
|
---|
265 | stringstream str3;
|
---|
266 | str3 << ready_tasks.size();
|
---|
267 | debugPrint(0, str3.str() + " ready tasks remaining");
|
---|
268 | stringstream str4;
|
---|
269 | str4 << idle_workers.size();
|
---|
270 | debugPrint(0, str4.str() + " idle workers remaining");
|
---|
271 | }
|
---|
272 | // - we can do some sanity checking here. For instance, there should be no
|
---|
273 | // tasks left in the ready queue at this point and all workers should be
|
---|
274 | // sitting in the idle pool
|
---|
275 | if (ready_tasks.size() > 0)
|
---|
276 | {
|
---|
277 | cerr << "Error! Processing supposedly complete but tasks are still pending!" << endl;
|
---|
278 | }
|
---|
279 | if (idle_workers.size() != number_of_workers)
|
---|
280 | {
|
---|
281 | cerr << "Error! Processing supposedly complete but workers are still busy!" << endl;
|
---|
282 | }
|
---|
283 |
|
---|
284 | // 4. By now all workers have returned, and should we waiting idle in the
|
---|
285 | // pool. Iterate over all workers telling them to exit by using the
|
---|
286 | // special EXITTAG.
|
---|
287 | for (int worker_id = 1; worker_id <= number_of_workers; worker_id++)
|
---|
288 | {
|
---|
289 | MPI_Send(0, // we don't intend to do any processing
|
---|
290 | 0, // zero data items
|
---|
291 | MPI_INT, // data item is an integer
|
---|
292 | worker_id, // destination process identifier
|
---|
293 | EXITTAG, // message flag indicating workers must exit
|
---|
294 | MPI_COMM_WORLD); // default communicator
|
---|
295 | }
|
---|
296 |
|
---|
297 | // 5. At this point all the slaves have expired and the master process is
|
---|
298 | // complete
|
---|
299 | debugPrint(0, "exiting");
|
---|
300 | }
|
---|
301 | /** masterProcess(const char *) **/
|
---|
302 |
|
---|
303 | /** Reads in an XML file, and builds an (ordered) list of tasks which I've
|
---|
304 | * termed a recipe.
|
---|
305 | * - note: when parsing, code should verify that no command exceeds BUFFERSIZE
|
---|
306 | * characters in length
|
---|
307 | */
|
---|
308 | static void
|
---|
309 | parseRecipe(const char* recipe_xml_path, recipe_t* recipe)
|
---|
310 | {
|
---|
311 | #ifdef TINYXML
|
---|
312 | TiXmlDocument recipe_xml_doc(recipe_xml_path);
|
---|
313 | bool is_loaded = recipe_xml_doc.LoadFile();
|
---|
314 | if (is_loaded)
|
---|
315 | {
|
---|
316 | // - top node is always the document (in this case <Recipe>) so we loop
|
---|
317 | // through it's child elements (which should be the no-prequisite tasks)
|
---|
318 | TiXmlElement *root = recipe_xml_doc.RootElement();
|
---|
319 | int task_count = 0;
|
---|
320 | TiXmlNode* element;
|
---|
321 | for ( element = root->FirstChild(); element != 0; element = element->NextSibling())
|
---|
322 | {
|
---|
323 | task_count++;
|
---|
324 | task_count = recurseRecipeXML(recipe, element, -1, task_count);
|
---|
325 | }
|
---|
326 | }
|
---|
327 | else
|
---|
328 | {
|
---|
329 | cerr << "Error! Failed to open/parse XML file: " << recipe_xml_path << endl;
|
---|
330 | }
|
---|
331 |
|
---|
332 | #else
|
---|
333 | // - dummy data for a start. We'll emulate a (complex) MG build with four
|
---|
334 | // indexes
|
---|
335 | task_t task1;
|
---|
336 | task1.id = 1;
|
---|
337 | task1.prerequisite = -1;
|
---|
338 | task1.command = "buildcol.pl -keepold -verbosity 3 -mode compress_text tdb0005000 >> build.log 2>&1";
|
---|
339 | recipe->push_back(task1);
|
---|
340 | task_t task2;
|
---|
341 | task2.id = 2;
|
---|
342 | task2.prerequisite = -1;
|
---|
343 | task2.command = "buildcol.pl -keepold -verbosity 3 -mode infodb tdb0005000 >> build.log 2>&1";
|
---|
344 | recipe->push_back(task2);
|
---|
345 | task_t task3;
|
---|
346 | task3.id = 3;
|
---|
347 | task3.prerequisite = 1;
|
---|
348 | task3.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:text tdb0005000 >> build.log 2>&1";
|
---|
349 | recipe->push_back(task3);
|
---|
350 | task_t task4;
|
---|
351 | task4.id = 4;
|
---|
352 | task4.prerequisite = 1;
|
---|
353 | task4.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:Title tdb0005000 >> build.log 2>&1";
|
---|
354 | recipe->push_back(task4);
|
---|
355 | task_t task5;
|
---|
356 | task5.id = 5;
|
---|
357 | task5.prerequisite = 1;
|
---|
358 | task5.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:Source tdb0005000 >> build.log 2>&1";
|
---|
359 | recipe->push_back(task5);
|
---|
360 | task_t task6;
|
---|
361 | task6.id = 6;
|
---|
362 | task6.prerequisite = 1;
|
---|
363 | task6.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:Title,Source tdb0005000 >> build.log 2>&1";
|
---|
364 | recipe->push_back(task6);
|
---|
365 | task_t task7;
|
---|
366 | task7.id = 7;
|
---|
367 | task7.prerequisite = 1;
|
---|
368 | task7.command = "buildcol.pl -keepold -verbosity 3 -mode build_index -indexname document:text,Title,Source tdb0005000 >> build.log 2>&1";
|
---|
369 | recipe->push_back(task7);
|
---|
370 | #endif
|
---|
371 |
|
---|
372 | #ifdef DEBUG
|
---|
373 | debugPrint(0, " * The Recipe!");
|
---|
374 | // If we are debugging, lets print out the recipe
|
---|
375 | recipe_t::iterator recipe_begin_itr = (*recipe).begin();
|
---|
376 | recipe_t::iterator recipe_end_itr = (*recipe).end();
|
---|
377 | recipe_t::iterator recipe_current_itr;
|
---|
378 | for(recipe_current_itr = recipe_begin_itr; recipe_current_itr < recipe_end_itr; ++recipe_current_itr)
|
---|
379 | {
|
---|
380 | task_t a_task = (task_t)(*recipe_current_itr);
|
---|
381 | stringstream recipestrstr;
|
---|
382 | recipestrstr << " Task " << a_task.id << ": " << a_task.command << " [Prequisite: " << a_task.prerequisite << "]";
|
---|
383 | debugPrint(0, recipestrstr.str());
|
---|
384 | }
|
---|
385 | #endif
|
---|
386 | }
|
---|
387 | /** parseRecipe(const char*, recipe_t*) **/
|
---|
388 |
|
---|
389 | /**
|
---|
390 | */
|
---|
391 | #ifdef TINYXML
|
---|
392 | static int recurseRecipeXML(recipe_t* recipe, TiXmlNode* element, int parent_id, int task_count)
|
---|
393 | {
|
---|
394 | // A. Loop through this task elements children. At least one subelement
|
---|
395 | // should be the command, while all other subelements should be
|
---|
396 | // tasks that need to be further recursed.
|
---|
397 | TiXmlNode* child;
|
---|
398 | int task_id = task_count;
|
---|
399 | bool found_command = false; // We can only have one command per task id
|
---|
400 | for (child = element->FirstChild(); child != 0; child = child->NextSibling())
|
---|
401 | {
|
---|
402 | // - we are only interested in element nodes
|
---|
403 | if (child->Type() == TiXmlNode::TINYXML_ELEMENT)
|
---|
404 | {
|
---|
405 | stringstream element_name_stream;
|
---|
406 | element_name_stream << child->Value();
|
---|
407 | string element_name = element_name_stream.str();
|
---|
408 | if (element_name == "Command" && !found_command)
|
---|
409 | {
|
---|
410 | // - retrieve the text child node
|
---|
411 | for ( TiXmlNode *text_grandchild = child->FirstChild(); text_grandchild != 0; text_grandchild = text_grandchild->NextSibling())
|
---|
412 | {
|
---|
413 | if (text_grandchild->Type() == TiXmlNode::TINYXML_TEXT)
|
---|
414 | {
|
---|
415 | TiXmlText* text_command = text_grandchild->ToText();
|
---|
416 | stringstream commandstrstr;
|
---|
417 | commandstrstr << text_command->Value() << " 2>&1";
|
---|
418 | // - and create the new task given the command string and
|
---|
419 | // accounting for any preqrequisites via the the parent task id
|
---|
420 | // (begin non-zero)
|
---|
421 | task_t a_task;
|
---|
422 | a_task.id = task_id;
|
---|
423 | a_task.prerequisite = parent_id;
|
---|
424 | a_task.command = commandstrstr.str();
|
---|
425 | recipe->push_back(a_task);
|
---|
426 | // - and prevent any further commands being associated with this
|
---|
427 | // task id
|
---|
428 | found_command = true;
|
---|
429 | }
|
---|
430 | }
|
---|
431 | }
|
---|
432 | else if (element_name == "Task")
|
---|
433 | {
|
---|
434 | task_count++;
|
---|
435 | recurseRecipeXML(recipe, child, task_id, task_count);
|
---|
436 | }
|
---|
437 | }
|
---|
438 | }
|
---|
439 | return task_count;
|
---|
440 | }
|
---|
441 | #endif
|
---|
442 | /** recurseRecipeXML(recipe_t*, TiXmlNode*, int, int) **/
|
---|
443 |
|
---|
444 | /** Each worker is responsible for executing a particular command in a shell,
|
---|
445 | * waiting until the command is complete, and then relaying to the master that
|
---|
446 | * work is complete.
|
---|
447 | * @param worker_id An integer containing the unique identifier (rank) of the
|
---|
448 | * worker process.
|
---|
449 | * @return void
|
---|
450 | */
|
---|
451 | static void
|
---|
452 | workerProcess(int worker_id)
|
---|
453 | {
|
---|
454 | debugPrint(worker_id, "starting");
|
---|
455 | // 0. Worker loops tirelessly until asked to exit (note that the mpi_recv
|
---|
456 | // command below is blocking - so no tight-loop thrashing)
|
---|
457 | while (1)
|
---|
458 | {
|
---|
459 | // 1. Receive a message from the master process
|
---|
460 | char command[BUFFERSIZE];
|
---|
461 | MPI_Status status;
|
---|
462 | MPI_Recv(&command, // buffer in which to store the command string
|
---|
463 | BUFFERSIZE, // we expect (at most) BUFFERSIZE characters
|
---|
464 | MPI_CHAR, // we expect a char array
|
---|
465 | 0, // recieve from the master process only
|
---|
466 | MPI_ANY_TAG, // accept any incoming tag
|
---|
467 | MPI_COMM_WORLD, // default communicator
|
---|
468 | &status);
|
---|
469 | // - watch for the special exit tag, and end the worker process if detected
|
---|
470 | if (status.MPI_TAG == EXITTAG)
|
---|
471 | {
|
---|
472 | debugPrint(worker_id, "exiting");
|
---|
473 | return;
|
---|
474 | }
|
---|
475 | // - otherwise the tag actually tells us the task id - which we need to
|
---|
476 | // reply back with later
|
---|
477 | int task_id = status.MPI_TAG;
|
---|
478 | // - action the command in the buffer
|
---|
479 | string message = "processing: ";
|
---|
480 | message += command;
|
---|
481 | debugPrint(worker_id, message);
|
---|
482 | // asynchronous
|
---|
483 | //system(command);
|
---|
484 | FILE *pipe = popen(command, "r");
|
---|
485 | if (!pipe)
|
---|
486 | {
|
---|
487 | cerr << "Warning! Failed to open pipe to command: " << command << endl;
|
---|
488 | }
|
---|
489 | else
|
---|
490 | {
|
---|
491 | char buffer[1024];
|
---|
492 | while(fgets(buffer, sizeof(buffer), pipe) != NULL)
|
---|
493 | {
|
---|
494 | cout << buffer;
|
---|
495 | }
|
---|
496 | pclose(pipe);
|
---|
497 | }
|
---|
498 | // 2. Send a reply back containing the id of the task just completed
|
---|
499 | debugPrint(worker_id, "complete");
|
---|
500 | MPI_Send(&task_id,
|
---|
501 | 1,
|
---|
502 | MPI_INT,
|
---|
503 | 0,
|
---|
504 | SUCCESSTAG,
|
---|
505 | MPI_COMM_WORLD);
|
---|
506 | }
|
---|
507 | }
|
---|
508 | /** workerProcess(int) **/
|
---|
509 |
|
---|
510 |
|
---|
511 |
|
---|
512 | /*
|
---|
513 | #include "mpi.h"
|
---|
514 |
|
---|
515 | #include <stdio.h>
|
---|
516 | #include <stdlib.h>
|
---|
517 |
|
---|
518 | #include <fstream>
|
---|
519 | #include <iostream>
|
---|
520 | #include <string>
|
---|
521 | #include <vector>
|
---|
522 |
|
---|
523 | using namespace std;
|
---|
524 |
|
---|
525 | #define KILOBUF 512
|
---|
526 | #define MEGABUF 10240
|
---|
527 |
|
---|
528 | int
|
---|
529 | main( int argc, char *argv [] )
|
---|
530 | {
|
---|
531 | // MPI variables
|
---|
532 | int num_tasks;
|
---|
533 | int rank;
|
---|
534 |
|
---|
535 | if (argc != 3 )
|
---|
536 | {
|
---|
537 | cerr << "Usage: " << argv[0] << " gsdlhome collection" << endl;
|
---|
538 | exit(-1);
|
---|
539 | }
|
---|
540 |
|
---|
541 | // Greenstone home directory
|
---|
542 | char *gsdl_home_dir = argv[1];
|
---|
543 | // Short collection name
|
---|
544 | char *collection = argv[2];
|
---|
545 |
|
---|
546 | // start MPI environment
|
---|
547 | int mpi_status = MPI_Init(&argc, &argv);
|
---|
548 | if (mpi_status != MPI_SUCCESS)
|
---|
549 | {
|
---|
550 | printf ("Error starting MPI program. Terminating.\n");
|
---|
551 | MPI_Abort(MPI_COMM_WORLD, mpi_status);
|
---|
552 | }
|
---|
553 |
|
---|
554 | // get MPI variables: number of processors and processor number
|
---|
555 | MPI_Status stat;
|
---|
556 | MPI_Comm_size(MPI_COMM_WORLD, &num_tasks);
|
---|
557 | MPI_Comm_rank(MPI_COMM_WORLD, &rank);
|
---|
558 |
|
---|
559 | // The Master node (dispatcher) has a rank of zero. All child processes have
|
---|
560 | // higher ranks.
|
---|
561 | if (rank == 0)
|
---|
562 | {
|
---|
563 | cout << "===== MPI Build =====" << endl;
|
---|
564 | cout << "Uses OpenMPI to allow several Greenstone build passes to be run in parallel." << endl;
|
---|
565 | cout << endl;
|
---|
566 | cout << "Number of Processors: " << num_tasks << endl;
|
---|
567 | cout << "Rank of current process: " << rank << endl;
|
---|
568 | cout << endl;
|
---|
569 |
|
---|
570 | cout << "[0] Dispatcher" << endl;
|
---|
571 | // buffer for acknowledgments
|
---|
572 | // - Is there some reason this buffer isn't actually set to be numtasks in
|
---|
573 | // size?
|
---|
574 | char incoming[KILOBUF];
|
---|
575 | // Buffer to send tasks
|
---|
576 | char buffer[KILOBUF];
|
---|
577 | // Request monitor for all tasks
|
---|
578 | MPI_Request request[KILOBUF];
|
---|
579 | // Status monitor for all tasks
|
---|
580 | MPI_Status status[KILOBUF];
|
---|
581 | // Number of processors running
|
---|
582 | int actual_tasks = 0;
|
---|
583 |
|
---|
584 | // Set initial status of all processors to idle
|
---|
585 | cout << " - initializing child tasks to idle" << endl;
|
---|
586 | for ( int i = 0; i < KILOBUF; i++ )
|
---|
587 | {
|
---|
588 | incoming[i] = ' ';
|
---|
589 | }
|
---|
590 |
|
---|
591 | // In the future this would be where the process reads in settings from the
|
---|
592 | // collect.cfg to determine how many passes are needed and thus what tasks
|
---|
593 | // there are available for children to process. For the moment I'll just
|
---|
594 | // hardcode the short three task list that essentially invokes each of the
|
---|
595 | // three standard Greenstone build modes - compress_text, build_indexes and
|
---|
596 | // make_infodb.
|
---|
597 | cout << " - populating task list... ";
|
---|
598 | // List of pending tasks to undertake
|
---|
599 | vector<string> tasks;
|
---|
600 | tasks.push_back("infodb");
|
---|
601 | tasks.push_back("compress_text");
|
---|
602 | tasks.push_back("build_index");
|
---|
603 | cout << "found " << tasks.size() << " tasks" << endl;
|
---|
604 |
|
---|
605 | // For each pending task
|
---|
606 | for(int j = 0; j < tasks.size(); j++)
|
---|
607 | {
|
---|
608 | // Search for idle processor
|
---|
609 | cout << " - searching for idle processor" << endl;
|
---|
610 | int dest=0;
|
---|
611 | int found = 0;
|
---|
612 | while ( (dest < ( num_tasks - 1 ) ) && ( found == 0 ) )
|
---|
613 | {
|
---|
614 | if (incoming[dest] == ' ')
|
---|
615 | {
|
---|
616 | found = 1;
|
---|
617 | }
|
---|
618 | else
|
---|
619 | {
|
---|
620 | dest++;
|
---|
621 | }
|
---|
622 | }
|
---|
623 |
|
---|
624 | // If no idle processor, wait for one to become idle
|
---|
625 | if (found == 0)
|
---|
626 | {
|
---|
627 | MPI_Waitany(num_tasks-1, request, &dest, status);
|
---|
628 | }
|
---|
629 |
|
---|
630 | // Write the tasks mode flag as the instruction to the child
|
---|
631 | sprintf(buffer, "%s", tasks[j].c_str());
|
---|
632 |
|
---|
633 | // Mark processors as busy
|
---|
634 | incoming[dest] = 'B';
|
---|
635 |
|
---|
636 | cout << " - asking child process to execute: " << tasks[j] << endl;
|
---|
637 |
|
---|
638 | // Send out the job to the processor
|
---|
639 | MPI_Send(&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
|
---|
640 |
|
---|
641 | // I'm guess there is some kind of 'fork' either here or above. Somehow
|
---|
642 | // this command 'waits' for a response from the child thread. In the
|
---|
643 | // usual case the response is a single space (' ') that overwrites the
|
---|
644 | // 'B' in the incoming string at the position that matches the child
|
---|
645 | // thread number. (Presumably you could count the B's in the incoming
|
---|
646 | // string to determine running threads... but we don't).
|
---|
647 | MPI_Irecv(&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
|
---|
648 | cout << "[DEBUG] When does this code actually execute? [cmd: " << buffer << "]" << endl;
|
---|
649 |
|
---|
650 | // Update counter of actual tasks
|
---|
651 | if (dest > actual_tasks)
|
---|
652 | {
|
---|
653 | actual_tasks = dest;
|
---|
654 | }
|
---|
655 | }
|
---|
656 |
|
---|
657 | // Wait until all outstanding tasks are completed
|
---|
658 | cout << " - waiting for outstanding tasks" << endl;
|
---|
659 | int dest;
|
---|
660 | for ( int k = 0; k < actual_tasks; k++ )
|
---|
661 | {
|
---|
662 | MPI_Waitany(actual_tasks, request, &dest, status);
|
---|
663 | }
|
---|
664 |
|
---|
665 | // Send message to end all processing engines
|
---|
666 | cout << " - ask all child processes to terminate" << endl;
|
---|
667 | char endstr[5] = "end";
|
---|
668 | for ( int l = 1; l < num_tasks; l++ )
|
---|
669 | {
|
---|
670 | MPI_Send(endstr, 4, MPI_CHAR, l, 1, MPI_COMM_WORLD);
|
---|
671 | }
|
---|
672 | }
|
---|
673 | // Slave node processing
|
---|
674 | else
|
---|
675 | {
|
---|
676 | cout << "[" << rank << "] Child Process" << endl;
|
---|
677 | char incoming[KILOBUF];
|
---|
678 |
|
---|
679 | do
|
---|
680 | {
|
---|
681 | // wait for instruction from master
|
---|
682 | MPI_Recv(&incoming, KILOBUF, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
|
---|
683 | if (strcmp(incoming, "end") != 0)
|
---|
684 | {
|
---|
685 | // Process a received job
|
---|
686 | cout << " + processing command: " << incoming << endl;
|
---|
687 |
|
---|
688 | // Create Greenstone import command
|
---|
689 | char command[2048];
|
---|
690 | sprintf(command, "%s/bin/script/buildcol.pl -keepold -verbosity 3 -mode %s %s", gsdl_home_dir, incoming, collection);
|
---|
691 | cout << " + Greenstone buildcol command: " << command << endl;
|
---|
692 |
|
---|
693 | // Invoke Greenstone import with manifest file
|
---|
694 | system (command);
|
---|
695 |
|
---|
696 | char line = ' ';
|
---|
697 | // send completed message
|
---|
698 | MPI_Send(&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
|
---|
699 | cout << " + done [cmd: " << command << "]" << endl;
|
---|
700 | }
|
---|
701 | }
|
---|
702 | // stop when "end" instruction is received
|
---|
703 | while (strcmp(incoming, "end") != 0);
|
---|
704 |
|
---|
705 | cout << " + child process terminating" << endl;
|
---|
706 | }
|
---|
707 |
|
---|
708 | // clean up MPI environment
|
---|
709 | MPI_Finalize();
|
---|
710 | if (0 == rank)
|
---|
711 | {
|
---|
712 | cout << "Complete!" << endl << endl;
|
---|
713 | }
|
---|
714 | }
|
---|
715 | */
|
---|