source: gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp@ 25839

Last change on this file since 25839 was 25839, checked in by jmt12, 12 years ago

Adding more timing to threads

  • Property svn:executable set to *
File size: 10.6 KB
RevLine 
[24589]1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
[24699]8// 2011MAR - added a bunch of commands to aid (my) understanding.
9// - made site argument optional to support GS2.
[24589]10// - moving manifest writing code into the rank=0 thread. This will
11// remove the artificial limit on epoc size caused by size of
[24699]12// message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14// ability to set up the location as a RAMDisk. [jmt12]
[24589]15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
[25839]20#include <time.h>
[24589]21
22#include <fstream>
23#include <iostream>
[24699]24#include <sstream>
[24589]25#include <string>
26#include <vector>
27
28using namespace std;
29
[24833]30//#define HARDAFFINITY 1
[24589]31#define BUFFERSIZE 512
32
33int
34main ( int argc, char *argv [] )
35{
36 int numtasks, rank, rc; // MPI variables
[25839]37 unsigned long int seconds = 0;
[24589]38
39 if (5 != argc && argc != 6 )
40 {
[25839]41 fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n");
[24589]42 exit(-1);
43 }
44
45 char *filelist = argv[1]; // list of filenames
46 char *epochStr = argv[2]; // number of files per task
47 int epoch = atoi (epochStr);
48 char *gsdlhomedir = argv[3]; // location of import script
49 char *collection = argv[4]; // Greenstone collection
[25839]50 char *site = NULL;
[24589]51 if (argc == 6)
52 {
53 site = argv[5]; // Greenstone site
54 }
55
56 // start MPI environment
57 rc = MPI_Init(&argc,&argv);
58 if (rc != MPI_SUCCESS)
59 {
[25839]60 fprintf(stderr, "Error starting MPI program. Terminating.\n");
[24589]61 MPI_Abort(MPI_COMM_WORLD, rc);
62 }
63
64 // We'll handle errors ourselves
65 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
66
67 // get MPI variables: number of processors and processor number
68 MPI_Status stat;
69 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
70 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
71
[25839]72 // Get processor name too - important when it could be anywhere in a cluster
73 int name_length;
74 char processor_name[MPI_MAX_PROCESSOR_NAME];
75 MPI_Get_processor_name(processor_name, &name_length);
76
[24589]77 // master node processing
78 if (rank == 0)
79 {
[25839]80 seconds = time(NULL);
81 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
[24589]82 char incoming[BUFFERSIZE]; // buffer for acknowledgments
83 char buffer[BUFFERSIZE]; // buffer to send tasks
84 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
85 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
86 int actualTasks = 0; // number of processors running
87
88 vector<char *> manifest_files; // Keep track of temp manifest filenames
89
90 // open file listing filenames to process
91 ifstream infile;
92 infile.open (filelist);
93 string line;
94
95 // set initial status of all processors to idle
[25839]96 fprintf(stderr, "[M] Initializing processor state\n");
[24589]97 for ( int j=0; j<BUFFERSIZE; j++ )
98 {
99 incoming[j] = ' ';
100 }
101
102 // scan through contents of file listing
[24699]103 int manifest_file_count = 1;
[25839]104 fprintf(stderr, "[M] Processing contents of filelist.txt\n");
[24589]105 while (!infile.eof ())
106 {
107 // get a filename
108 getline (infile, line);
[25839]109
110 if ( line.length() == 0 && !infile.eof() )
111 {
112 fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist);
113 }
114
[24589]115 if (line.length() > 0)
116 {
117 // search for idle processor
118 int dest=0;
119 int found = 0;
[25839]120 fprintf(stderr, "[M] Searching for idle processor\n");
[24589]121 while ((dest<(numtasks-1)) && (found == 0))
122 {
123 if (incoming[dest] == ' ')
124 {
125 found = 1;
126 }
127 else
128 {
129 dest++;
130 }
131 }
132
133 // if no idle processor, wait for one to become idle
134 if (found == 0)
135 {
[25839]136 fprintf(stderr, "[M] Waiting for processor to become idle\n");
[24589]137 MPI_Waitany (numtasks-1, request, &dest, status);
138 }
139
140 // construct manifest filename
[25839]141 fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count);
[24699]142 stringstream manifestfilename_strstr;
143 manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
144 string manifestfilename_str = manifestfilename_strstr.str();
145 char *manifestfilename = new char [manifestfilename_str.size() + 1];
146 strcpy(manifestfilename, manifestfilename_str.c_str());
[24589]147 ofstream manifestfile;
148 if (manifestfilename != NULL)
149 {
150 // create manifest file
151 manifestfile.open (manifestfilename);
152 if (manifestfile.fail())
153 {
[25839]154 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
[24589]155 }
156 }
157 else
158 {
[25839]159 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
[24589]160 }
161 manifestfile << "<Manifest><Index>" << endl;
162
163 // add the first filename to the instruction
164 manifestfile << "<Filename>" << line << "</Filename>" << endl;
165 int epochCounter = epoch;
166
167 // if epoch>1 and more filenames, add more filenames
168 while ((epochCounter > 1) && (!infile.eof ()))
169 {
170 getline (infile, line);
171 if (line.length () > 0)
172 {
173 manifestfile << "<Filename>" << line << "</Filename>" << endl;
174 epochCounter--;
175 }
176 }
177
178 manifestfile << "</Index></Manifest>" << endl;
179 manifestfile.close ();
180
[25839]181 fprintf(stderr, "[M0] Manifest file complete\n");
[24833]182
[24589]183 // Store manifest filepath so we can remove it later
184 manifest_files.push_back(manifestfilename);
185
186 // Send the manifest filename as the instruction
[25839]187 fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n");
[24589]188 sprintf(buffer, "%s", manifestfilename);
189
190 // mark processors as busy
191 incoming[dest] = 'B';
192 // send out the job to the processor
[25839]193 fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1));
[24589]194 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
195 // wait for a done acknowledgement
196 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
[25839]197 fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1));
[24589]198 // update counter of actual tasks
199 if (dest > actualTasks)
200 {
201 actualTasks = dest;
[25839]202 fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks);
[24589]203 }
[24699]204 // increase number of manifest files processed
205 manifest_file_count++;
[24589]206 }
207 }
208
209 infile.close();
210
211 // wait until all outstanding tasks are completed
[25839]212 fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n");
[24589]213 int dest;
214 for ( int k=0; k<actualTasks; k++ )
215 {
216 MPI_Waitany (actualTasks, request, &dest, status);
217 }
218
219 // send message to end all processing engines
[25839]220 fprintf(stderr, "[M0] Master asking children to exit\n");
[24589]221 char endstr[5] = "end";
222 for ( int i=1; i<numtasks; i++ )
223 {
224 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
225 }
226
227 // Free up manifest files
[25839]228 fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n");
[24589]229 for ( int i = 0; i < manifest_files.size(); i++)
230 {
231 free(manifest_files[i]);
232 }
[25839]233 seconds = time(NULL);
234 fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds);
[24589]235 }
[25839]236 // worker node processing
[24589]237 else
238 {
[25839]239 seconds = time(NULL);
240 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
241
242 // Check to see if GSDLHOME exists in the environment (it will on multicore
243 // computer, but won't on compute nodes in a cluster). It will be NULL if
244 // source setup.bash hasn't been run (on this computer).
245 const char * gsdlhometest = getenv("GSDLHOME");
246
[24589]247 char incoming[BUFFERSIZE];
[24833]248 int counter = 0;
[24589]249 do
250 {
251 // wait for instruction from master
252 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
253 if (resval != MPI_SUCCESS)
254 {
[25839]255 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
[24589]256 MPI_Abort(MPI_COMM_WORLD, rc);
257 }
[24833]258 counter++;
[24589]259 if (strcmp (incoming, "end") != 0)
260 {
261 // process a received job
[25839]262 seconds = time(NULL);
263 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
[24589]264 // create Greenstone import command
265 // - incoming now contains the name of the manifest file to process
266 char command[2048];
[24833]267#ifdef HARDAFFINITY
268 int cpu = rank - 1;
[25839]269 fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu);
270 char affinity[16];
271 sprintf(affinity, "taskset -c %d", cpu);
272#else
273 char affinity[16] = "";
274#endif
275 if (site != NULL && strlen(site) > 0)
[24589]276 {
[25839]277 if (gsdlhometest != NULL)
278 {
279 sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter);
280 }
281 else
282 {
283 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", gsdlhomedir, affinity, incoming, site, collection, rank, counter);
284 }
[24589]285 }
[25839]286 else if (gsdlhometest != NULL)
[24589]287 {
[25839]288 sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter);
[24589]289 }
[24833]290 else
291 {
[25839]292 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1\"", affinity, gsdlhomedir, incoming, collection, rank, counter);
[24833]293 }
[24589]294
[25839]295 fprintf(stderr, "[W%d] system('%s')\n", rank, command);
[24833]296
[24589]297 // invoke Greenstone import with manifest file
298 system (command);
[25839]299 seconds = time(NULL);
300 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
[24589]301 char line = ' ';
302 // send completed message
303 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
304 }
305 }
306 while (strcmp (incoming, "end") != 0);
307 // stop when "end" instruction is received
[25839]308 seconds = time(NULL);
309 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
[24589]310 }
[25839]311 ///err << "Finalizing..." << endl;
[24589]312 MPI_Finalize();
313}
Note: See TracBrowser for help on using the repository browser.