source: gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp@ 35762

Last change on this file since 35762 was 30298, checked in by jmt12, 9 years ago

Correcting path to manifest files for use in Greenstone3. Started to think about making this Windows safe by defining path separator by OS. However calls through to perl et al will need more work

  • Property svn:executable set to *
File size: 11.3 KB
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid (my) understanding.
9// - made site argument optional to support GS2.
10// - moving manifest writing code into the rank=0 thread. This will
11// remove the artificial limit on epoc size caused by size of
12// message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14// ability to set up the location as a RAMDisk. [jmt12]
15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <time.h>
21
22#include <fstream>
23#include <iostream>
24#include <sstream>
25#include <string>
26#include <vector>
27
28using namespace std;
29
30//#define HARDAFFINITY 1
31#define BUFFERSIZE 512
32
33#if defined _WIN32 || defined __CYGWIN__
34const char path_sep = '\\';
35#else
36const char path_sep = '/';
37#endif
38
39int
40main ( int argc, char *argv [] )
41{
42 int numtasks, rank, rc; // MPI variables
43 unsigned long int seconds = 0;
44
45 if (5 != argc && argc != 6 )
46 {
47 fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n");
48 exit(-1);
49 }
50
51 char *filelist = argv[1]; // list of filenames
52 char *epochStr = argv[2]; // number of files per task
53 int epoch = atoi (epochStr);
54 char *gsdlhomedir = argv[3]; // location of import script
55 char *collection = argv[4]; // Greenstone collection
56 char *site = NULL;
57 if (argc == 6)
58 {
59 site = argv[5]; // Greenstone site
60 }
61
62 // start MPI environment
63 rc = MPI_Init(&argc,&argv);
64 if (rc != MPI_SUCCESS)
65 {
66 fprintf(stderr, "Error starting MPI program. Terminating.\n");
67 MPI_Abort(MPI_COMM_WORLD, rc);
68 }
69
70 // We'll handle errors ourselves
71 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
72
73 // get MPI variables: number of processors and processor number
74 MPI_Status stat;
75 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
76 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
77
78 // Get processor name too - important when it could be anywhere in a cluster
79 int name_length;
80 char processor_name[MPI_MAX_PROCESSOR_NAME];
81 MPI_Get_processor_name(processor_name, &name_length);
82
83 // master node processing
84 if (rank == 0)
85 {
86 seconds = time(NULL);
87 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
88 char incoming[BUFFERSIZE]; // buffer for acknowledgments
89 char buffer[BUFFERSIZE]; // buffer to send tasks
90 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
91 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
92 int actualTasks = 0; // number of processors running
93
94 vector<char *> manifest_files; // Keep track of temp manifest filenames
95
96 // open file listing filenames to process
97 ifstream infile;
98 infile.open (filelist);
99 string line;
100
101 // set initial status of all processors to idle
102 fprintf(stderr, "[M] Initializing processor state\n");
103 for ( int j=0; j<BUFFERSIZE; j++ )
104 {
105 incoming[j] = ' ';
106 }
107
108 // scan through contents of file listing
109 int manifest_file_count = 1;
110 fprintf(stderr, "[M] Processing contents of filelist.txt\n");
111 while (!infile.eof ())
112 {
113 // get a filename
114 getline (infile, line);
115
116 if ( line.length() == 0 && !infile.eof() )
117 {
118 fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist);
119 }
120
121 if (line.length() > 0)
122 {
123 // search for idle processor
124 int dest=0;
125 int found = 0;
126 fprintf(stderr, "[M] Searching for idle processor\n");
127 while ((dest<(numtasks-1)) && (found == 0))
128 {
129 if (incoming[dest] == ' ')
130 {
131 found = 1;
132 }
133 else
134 {
135 dest++;
136 }
137 }
138
139 // if no idle processor, wait for one to become idle
140 if (found == 0)
141 {
142 fprintf(stderr, "[M] Waiting for processor to become idle\n");
143 MPI_Waitany (numtasks-1, request, &dest, status);
144 }
145
146 // construct manifest filename
147 fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count);
148 stringstream manifestfilename_strstr;
149 if (site == NULL) {
150 manifestfilename_strstr << gsdlhomedir << path_sep << "collect" << path_sep << collection << path_sep << "tmp" << path_sep << "manifest." << manifest_file_count << ".xml";
151 }
152 else {
153 manifestfilename_strstr << gsdlhomedir << path_sep << "web" << path_sep << "sites" << path_sep << site << path_sep << "collect" << path_sep << collection << path_sep << "tmp" << path_sep << "manifest." << manifest_file_count << ".xml";
154 }
155
156 string manifestfilename_str = manifestfilename_strstr.str();
157 char *manifestfilename = new char [manifestfilename_str.size() + 1];
158 strcpy(manifestfilename, manifestfilename_str.c_str());
159 ofstream manifestfile;
160 if (manifestfilename != NULL)
161 {
162 // create manifest file
163 manifestfile.open (manifestfilename);
164 if (manifestfile.fail())
165 {
166 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
167 }
168 }
169 else
170 {
171 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
172 }
173 manifestfile << "<Manifest version=\"2\"><Index>" << endl;
174
175 // add the first filename to the instruction
176 manifestfile << "<Filename>" << line << "</Filename>" << endl;
177 int epochCounter = epoch;
178
179 // if epoch>1 and more filenames, add more filenames
180 while ((epochCounter > 1) && (!infile.eof ()))
181 {
182 getline (infile, line);
183 if (line.length () > 0)
184 {
185 manifestfile << "<Filename>" << line << "</Filename>" << endl;
186 epochCounter--;
187 }
188 }
189
190 manifestfile << "</Index></Manifest>" << endl;
191 manifestfile.close ();
192
193 fprintf(stderr, "[M0] Manifest file complete\n");
194
195 // Store manifest filepath so we can remove it later
196 manifest_files.push_back(manifestfilename);
197
198 // Send the manifest filename as the instruction
199 fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n");
200 sprintf(buffer, "%s", manifestfilename);
201
202 // mark processors as busy
203 incoming[dest] = 'B';
204 // send out the job to the processor
205 fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1));
206 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
207 // wait for a done acknowledgement
208 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
209 fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1));
210 // update counter of actual tasks
211 if (dest > actualTasks)
212 {
213 actualTasks = dest;
214 fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks);
215 }
216 // increase number of manifest files processed
217 manifest_file_count++;
218 }
219 }
220
221 infile.close();
222
223 // wait until all outstanding tasks are completed
224 fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n");
225 int dest;
226 for ( int k=0; k<actualTasks; k++ )
227 {
228 MPI_Waitany (actualTasks, request, &dest, status);
229 }
230
231 // send message to end all processing engines
232 fprintf(stderr, "[M0] Master asking children to exit\n");
233 char endstr[5] = "end";
234 for ( int i=1; i<numtasks; i++ )
235 {
236 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
237 }
238
239 // Free up manifest files
240 fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n");
241 for ( int i = 0; i < manifest_files.size(); i++)
242 {
243 free(manifest_files[i]);
244 }
245 seconds = time(NULL);
246 fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds);
247 }
248 // worker node processing
249 else
250 {
251 seconds = time(NULL);
252 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
253
254 // Check to see if GSDLHOME exists in the environment (it will on multicore
255 // computer, but won't on compute nodes in a cluster). It will be NULL if
256 // source setup.bash hasn't been run (on this computer).
257 const char * gsdlhometest = getenv("GSDLHOME");
258
259 char incoming[BUFFERSIZE];
260 int counter = 0;
261 do
262 {
263 // wait for instruction from master
264 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
265 if (resval != MPI_SUCCESS)
266 {
267 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
268 MPI_Abort(MPI_COMM_WORLD, rc);
269 }
270 counter++;
271 fprintf(stderr, "[W%d] Received: %s", rank, incoming);
272 if (strcmp (incoming, "end") != 0)
273 {
274 // process a received job
275 seconds = time(NULL);
276 fprintf(stderr, "[W%d:%lu] Processing: %s (%d)\n", rank, seconds, incoming, counter);
277 // create Greenstone import command
278 // - incoming now contains the name of the manifest file to process
279 char command[2048];
280#ifdef HARDAFFINITY
281 int cpu = rank - 1;
282 fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu);
283 char affinity[16];
284 sprintf(affinity, "taskset -c %d", cpu);
285#else
286 char affinity[16] = "";
287#endif
288
289 // In the commands below, need to consider Windows. In particular:
290 // /dev/null => NUL
291 // /tmp/file.log => %temp%\\file.log
292 // But we'll also have issues with affinity and source bash files
293 if (site != NULL && strlen(site) > 0)
294 {
295 if (gsdlhometest != NULL)
296 {
297 sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/gsimport-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter);
298 }
299 else
300 {
301 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/gsimport-W%d-%d.log 2>&1\"", gsdlhomedir, affinity, incoming, site, collection, rank, counter);
302 }
303 }
304 else if (gsdlhometest != NULL)
305 {
306 sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/gsimport-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter);
307 }
308 else
309 {
310 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/gsimport-W%d-%d.log 2>&1\"", gsdlhomedir, affinity, incoming, collection, rank, counter);
311 }
312
313 fprintf(stderr, "[W%d] system('%s')\n", rank, command);
314
315 // invoke Greenstone import with manifest file
316 system (command);
317 seconds = time(NULL);
318 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
319 char line = ' ';
320 // send completed message
321 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
322 }
323 }
324 while (strcmp (incoming, "end") != 0);
325 // stop when "end" instruction is received
326 seconds = time(NULL);
327 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
328 }
329 ///err << "Finalizing..." << endl;
330 MPI_Finalize();
331}
Note: See TracBrowser for help on using the repository browser.