source: gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp@ 27179

Last change on this file since 27179 was 27179, checked in by davidb, 11 years ago

Mods to allow code to run with Greenstone3

  • Property svn:executable set to *
File size: 10.8 KB
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid (my) understanding.
9// - made site argument optional to support GS2.
10// - moving manifest writing code into the rank=0 thread. This will
11// remove the artificial limit on epoc size caused by size of
12// message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14// ability to set up the location as a RAMDisk. [jmt12]
15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <time.h>
21
22#include <fstream>
23#include <iostream>
24#include <sstream>
25#include <string>
26#include <vector>
27
28using namespace std;
29
30//#define HARDAFFINITY 1
31#define BUFFERSIZE 512
32
33int
34main ( int argc, char *argv [] )
35{
36 int numtasks, rank, rc; // MPI variables
37 unsigned long int seconds = 0;
38
39 if (5 != argc && argc != 6 )
40 {
41 fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n");
42 exit(-1);
43 }
44
45 char *filelist = argv[1]; // list of filenames
46 char *epochStr = argv[2]; // number of files per task
47 int epoch = atoi (epochStr);
48 char *gsdlhomedir = argv[3]; // location of import script
49 char *collection = argv[4]; // Greenstone collection
50 char *site = NULL;
51 if (argc == 6)
52 {
53 site = argv[5]; // Greenstone site
54 }
55
56 // start MPI environment
57 rc = MPI_Init(&argc,&argv);
58 if (rc != MPI_SUCCESS)
59 {
60 fprintf(stderr, "Error starting MPI program. Terminating.\n");
61 MPI_Abort(MPI_COMM_WORLD, rc);
62 }
63
64 // We'll handle errors ourselves
65 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
66
67 // get MPI variables: number of processors and processor number
68 MPI_Status stat;
69 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
70 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
71
72 // Get processor name too - important when it could be anywhere in a cluster
73 int name_length;
74 char processor_name[MPI_MAX_PROCESSOR_NAME];
75 MPI_Get_processor_name(processor_name, &name_length);
76
77 // master node processing
78 if (rank == 0)
79 {
80 seconds = time(NULL);
81 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
82 char incoming[BUFFERSIZE]; // buffer for acknowledgments
83 char buffer[BUFFERSIZE]; // buffer to send tasks
84 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
85 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
86 int actualTasks = 0; // number of processors running
87
88 vector<char *> manifest_files; // Keep track of temp manifest filenames
89
90 // open file listing filenames to process
91 ifstream infile;
92 infile.open (filelist);
93 string line;
94
95 // set initial status of all processors to idle
96 fprintf(stderr, "[M] Initializing processor state\n");
97 for ( int j=0; j<BUFFERSIZE; j++ )
98 {
99 incoming[j] = ' ';
100 }
101
102 // scan through contents of file listing
103 int manifest_file_count = 1;
104 fprintf(stderr, "[M] Processing contents of filelist.txt\n");
105 while (!infile.eof ())
106 {
107 // get a filename
108 getline (infile, line);
109
110 if ( line.length() == 0 && !infile.eof() )
111 {
112 fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist);
113 }
114
115 if (line.length() > 0)
116 {
117 // search for idle processor
118 int dest=0;
119 int found = 0;
120 fprintf(stderr, "[M] Searching for idle processor\n");
121 while ((dest<(numtasks-1)) && (found == 0))
122 {
123 if (incoming[dest] == ' ')
124 {
125 found = 1;
126 }
127 else
128 {
129 dest++;
130 }
131 }
132
133 // if no idle processor, wait for one to become idle
134 if (found == 0)
135 {
136 fprintf(stderr, "[M] Waiting for processor to become idle\n");
137 MPI_Waitany (numtasks-1, request, &dest, status);
138 }
139
140 // construct manifest filename
141 fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count);
142 stringstream manifestfilename_strstr;
143 if (site == NULL) {
144 manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
145 }
146 else {
147 manifestfilename_strstr << gsdlhomedir << "/sites/" << site << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
148 }
149
150 string manifestfilename_str = manifestfilename_strstr.str();
151 char *manifestfilename = new char [manifestfilename_str.size() + 1];
152 strcpy(manifestfilename, manifestfilename_str.c_str());
153 ofstream manifestfile;
154 if (manifestfilename != NULL)
155 {
156 // create manifest file
157 manifestfile.open (manifestfilename);
158 if (manifestfile.fail())
159 {
160 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
161 }
162 }
163 else
164 {
165 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
166 }
167 manifestfile << "<Manifest><Index>" << endl;
168
169 // add the first filename to the instruction
170 manifestfile << "<Filename>" << line << "</Filename>" << endl;
171 int epochCounter = epoch;
172
173 // if epoch>1 and more filenames, add more filenames
174 while ((epochCounter > 1) && (!infile.eof ()))
175 {
176 getline (infile, line);
177 if (line.length () > 0)
178 {
179 manifestfile << "<Filename>" << line << "</Filename>" << endl;
180 epochCounter--;
181 }
182 }
183
184 manifestfile << "</Index></Manifest>" << endl;
185 manifestfile.close ();
186
187 fprintf(stderr, "[M0] Manifest file complete\n");
188
189 // Store manifest filepath so we can remove it later
190 manifest_files.push_back(manifestfilename);
191
192 // Send the manifest filename as the instruction
193 fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n");
194 sprintf(buffer, "%s", manifestfilename);
195
196 // mark processors as busy
197 incoming[dest] = 'B';
198 // send out the job to the processor
199 fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1));
200 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
201 // wait for a done acknowledgement
202 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
203 fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1));
204 // update counter of actual tasks
205 if (dest > actualTasks)
206 {
207 actualTasks = dest;
208 fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks);
209 }
210 // increase number of manifest files processed
211 manifest_file_count++;
212 }
213 }
214
215 infile.close();
216
217 // wait until all outstanding tasks are completed
218 fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n");
219 int dest;
220 for ( int k=0; k<actualTasks; k++ )
221 {
222 MPI_Waitany (actualTasks, request, &dest, status);
223 }
224
225 // send message to end all processing engines
226 fprintf(stderr, "[M0] Master asking children to exit\n");
227 char endstr[5] = "end";
228 for ( int i=1; i<numtasks; i++ )
229 {
230 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
231 }
232
233 // Free up manifest files
234 fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n");
235 for ( int i = 0; i < manifest_files.size(); i++)
236 {
237 free(manifest_files[i]);
238 }
239 seconds = time(NULL);
240 fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds);
241 }
242 // worker node processing
243 else
244 {
245 seconds = time(NULL);
246 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
247
248 // Check to see if GSDLHOME exists in the environment (it will on multicore
249 // computer, but won't on compute nodes in a cluster). It will be NULL if
250 // source setup.bash hasn't been run (on this computer).
251 const char * gsdlhometest = getenv("GSDLHOME");
252
253 char incoming[BUFFERSIZE];
254 int counter = 0;
255 do
256 {
257 // wait for instruction from master
258 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
259 if (resval != MPI_SUCCESS)
260 {
261 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
262 MPI_Abort(MPI_COMM_WORLD, rc);
263 }
264 counter++;
265 fprintf(stderr, "[W%d] Received: %s", rank, incoming);
266 if (strcmp (incoming, "end") != 0)
267 {
268 // process a received job
269 seconds = time(NULL);
270 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
271 // create Greenstone import command
272 // - incoming now contains the name of the manifest file to process
273 char command[2048];
274#ifdef HARDAFFINITY
275 int cpu = rank - 1;
276 fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu);
277 char affinity[16];
278 sprintf(affinity, "taskset -c %d", cpu);
279#else
280 char affinity[16] = "";
281#endif
282 if (site != NULL && strlen(site) > 0)
283 {
284 if (gsdlhometest != NULL)
285 {
286 sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/gsimport-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter);
287 }
288 else
289 {
290 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/gsimport-W%d-%d.log 2>&1\"", gsdlhomedir, affinity, incoming, site, collection, rank, counter);
291 }
292 }
293 else if (gsdlhometest != NULL)
294 {
295 sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/gsimport-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter);
296 }
297 else
298 {
299 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/gsimport-W%d-%d.log 2>&1\"", gsdlhomedir, affinity, incoming, collection, rank, counter);
300 }
301
302 fprintf(stderr, "[W%d] system('%s')\n", rank, command);
303
304 // invoke Greenstone import with manifest file
305 system (command);
306 seconds = time(NULL);
307 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
308 char line = ' ';
309 // send completed message
310 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
311 }
312 }
313 while (strcmp (incoming, "end") != 0);
314 // stop when "end" instruction is received
315 seconds = time(NULL);
316 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
317 }
318 ///err << "Finalizing..." << endl;
319 MPI_Finalize();
320}
Note: See TracBrowser for help on using the repository browser.