source: gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp@ 26188

Last change on this file since 26188 was 26188, checked in by jmt12, 12 years ago

Fixing bug where I passed the wrong number of arguments to a sprintf (added cd command, but forgot to add gsdlhomedir)

  • Property svn:executable set to *
File size: 10.6 KB
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid (my) understanding.
9// - made site argument optional to support GS2.
10// - moving manifest writing code into the rank=0 thread. This will
11// remove the artificial limit on epoc size caused by size of
12// message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14// ability to set up the location as a RAMDisk. [jmt12]
15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <time.h>
21
22#include <fstream>
23#include <iostream>
24#include <sstream>
25#include <string>
26#include <vector>
27
28using namespace std;
29
30//#define HARDAFFINITY 1
31#define BUFFERSIZE 512
32
33int
34main ( int argc, char *argv [] )
35{
36 int numtasks, rank, rc; // MPI variables
37 unsigned long int seconds = 0;
38
39 if (5 != argc && argc != 6 )
40 {
41 fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n");
42 exit(-1);
43 }
44
45 char *filelist = argv[1]; // list of filenames
46 char *epochStr = argv[2]; // number of files per task
47 int epoch = atoi (epochStr);
48 char *gsdlhomedir = argv[3]; // location of import script
49 char *collection = argv[4]; // Greenstone collection
50 char *site = NULL;
51 if (argc == 6)
52 {
53 site = argv[5]; // Greenstone site
54 }
55
56 // start MPI environment
57 rc = MPI_Init(&argc,&argv);
58 if (rc != MPI_SUCCESS)
59 {
60 fprintf(stderr, "Error starting MPI program. Terminating.\n");
61 MPI_Abort(MPI_COMM_WORLD, rc);
62 }
63
64 // We'll handle errors ourselves
65 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
66
67 // get MPI variables: number of processors and processor number
68 MPI_Status stat;
69 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
70 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
71
72 // Get processor name too - important when it could be anywhere in a cluster
73 int name_length;
74 char processor_name[MPI_MAX_PROCESSOR_NAME];
75 MPI_Get_processor_name(processor_name, &name_length);
76
77 // master node processing
78 if (rank == 0)
79 {
80 seconds = time(NULL);
81 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
82 char incoming[BUFFERSIZE]; // buffer for acknowledgments
83 char buffer[BUFFERSIZE]; // buffer to send tasks
84 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
85 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
86 int actualTasks = 0; // number of processors running
87
88 vector<char *> manifest_files; // Keep track of temp manifest filenames
89
90 // open file listing filenames to process
91 ifstream infile;
92 infile.open (filelist);
93 string line;
94
95 // set initial status of all processors to idle
96 fprintf(stderr, "[M] Initializing processor state\n");
97 for ( int j=0; j<BUFFERSIZE; j++ )
98 {
99 incoming[j] = ' ';
100 }
101
102 // scan through contents of file listing
103 int manifest_file_count = 1;
104 fprintf(stderr, "[M] Processing contents of filelist.txt\n");
105 while (!infile.eof ())
106 {
107 // get a filename
108 getline (infile, line);
109
110 if ( line.length() == 0 && !infile.eof() )
111 {
112 fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist);
113 }
114
115 if (line.length() > 0)
116 {
117 // search for idle processor
118 int dest=0;
119 int found = 0;
120 fprintf(stderr, "[M] Searching for idle processor\n");
121 while ((dest<(numtasks-1)) && (found == 0))
122 {
123 if (incoming[dest] == ' ')
124 {
125 found = 1;
126 }
127 else
128 {
129 dest++;
130 }
131 }
132
133 // if no idle processor, wait for one to become idle
134 if (found == 0)
135 {
136 fprintf(stderr, "[M] Waiting for processor to become idle\n");
137 MPI_Waitany (numtasks-1, request, &dest, status);
138 }
139
140 // construct manifest filename
141 fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count);
142 stringstream manifestfilename_strstr;
143 manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
144 string manifestfilename_str = manifestfilename_strstr.str();
145 char *manifestfilename = new char [manifestfilename_str.size() + 1];
146 strcpy(manifestfilename, manifestfilename_str.c_str());
147 ofstream manifestfile;
148 if (manifestfilename != NULL)
149 {
150 // create manifest file
151 manifestfile.open (manifestfilename);
152 if (manifestfile.fail())
153 {
154 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
155 }
156 }
157 else
158 {
159 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
160 }
161 manifestfile << "<Manifest><Index>" << endl;
162
163 // add the first filename to the instruction
164 manifestfile << "<Filename>" << line << "</Filename>" << endl;
165 int epochCounter = epoch;
166
167 // if epoch>1 and more filenames, add more filenames
168 while ((epochCounter > 1) && (!infile.eof ()))
169 {
170 getline (infile, line);
171 if (line.length () > 0)
172 {
173 manifestfile << "<Filename>" << line << "</Filename>" << endl;
174 epochCounter--;
175 }
176 }
177
178 manifestfile << "</Index></Manifest>" << endl;
179 manifestfile.close ();
180
181 fprintf(stderr, "[M0] Manifest file complete\n");
182
183 // Store manifest filepath so we can remove it later
184 manifest_files.push_back(manifestfilename);
185
186 // Send the manifest filename as the instruction
187 fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n");
188 sprintf(buffer, "%s", manifestfilename);
189
190 // mark processors as busy
191 incoming[dest] = 'B';
192 // send out the job to the processor
193 fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1));
194 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
195 // wait for a done acknowledgement
196 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
197 fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1));
198 // update counter of actual tasks
199 if (dest > actualTasks)
200 {
201 actualTasks = dest;
202 fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks);
203 }
204 // increase number of manifest files processed
205 manifest_file_count++;
206 }
207 }
208
209 infile.close();
210
211 // wait until all outstanding tasks are completed
212 fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n");
213 int dest;
214 for ( int k=0; k<actualTasks; k++ )
215 {
216 MPI_Waitany (actualTasks, request, &dest, status);
217 }
218
219 // send message to end all processing engines
220 fprintf(stderr, "[M0] Master asking children to exit\n");
221 char endstr[5] = "end";
222 for ( int i=1; i<numtasks; i++ )
223 {
224 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
225 }
226
227 // Free up manifest files
228 fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n");
229 for ( int i = 0; i < manifest_files.size(); i++)
230 {
231 free(manifest_files[i]);
232 }
233 seconds = time(NULL);
234 fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds);
235 }
236 // worker node processing
237 else
238 {
239 seconds = time(NULL);
240 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
241
242 // Check to see if GSDLHOME exists in the environment (it will on multicore
243 // computer, but won't on compute nodes in a cluster). It will be NULL if
244 // source setup.bash hasn't been run (on this computer).
245 const char * gsdlhometest = getenv("GSDLHOME");
246
247 char incoming[BUFFERSIZE];
248 int counter = 0;
249 do
250 {
251 // wait for instruction from master
252 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
253 if (resval != MPI_SUCCESS)
254 {
255 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
256 MPI_Abort(MPI_COMM_WORLD, rc);
257 }
258 counter++;
259 fprintf(stderr, "[W%d] Received: %s", rank, incoming);
260 if (strcmp (incoming, "end") != 0)
261 {
262 // process a received job
263 seconds = time(NULL);
264 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
265 // create Greenstone import command
266 // - incoming now contains the name of the manifest file to process
267 char command[2048];
268#ifdef HARDAFFINITY
269 int cpu = rank - 1;
270 fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu);
271 char affinity[16];
272 sprintf(affinity, "taskset -c %d", cpu);
273#else
274 char affinity[16] = "";
275#endif
276 if (site != NULL && strlen(site) > 0)
277 {
278 if (gsdlhometest != NULL)
279 {
280 sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/gsimport-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter);
281 }
282 else
283 {
284 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/gsimport-W%d-%d.log 2>&1", gsdlhomedir, affinity, incoming, site, collection, rank, counter);
285 }
286 }
287 else if (gsdlhometest != NULL)
288 {
289 sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/gsimport-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter);
290 }
291 else
292 {
293 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/gsimport-W%d-%d.log 2>&1\"", gsdlhomedir, affinity, incoming, collection, rank, counter);
294 }
295
296 fprintf(stderr, "[W%d] system('%s')\n", rank, command);
297
298 // invoke Greenstone import with manifest file
299 system (command);
300 seconds = time(NULL);
301 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
302 char line = ' ';
303 // send completed message
304 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
305 }
306 }
307 while (strcmp (incoming, "end") != 0);
308 // stop when "end" instruction is received
309 seconds = time(NULL);
310 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
311 }
312 ///err << "Finalizing..." << endl;
313 MPI_Finalize();
314}
Note: See TracBrowser for help on using the repository browser.