source: gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp@ 24833

Last change on this file since 24833 was 24833, checked in by jmt12, 12 years ago

Adding support for setting affinity of worker processes. Lots of debug statements (due to 1M doc parallel build issue)

  • Property svn:executable set to *
File size: 9.3 KB
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid (my) understanding.
9// - made site argument optional to support GS2.
10// - moving manifest writing code into the rank=0 thread. This will
11// remove the artificial limit on epoc size caused by size of
12// message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14// ability to set up the location as a RAMDisk. [jmt12]
15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
20
21#include <fstream>
22#include <iostream>
23#include <sstream>
24#include <string>
25#include <vector>
26
27using namespace std;
28
29//#define HARDAFFINITY 1
30#define BUFFERSIZE 512
31
32int
33main ( int argc, char *argv [] )
34{
35 int numtasks, rank, rc; // MPI variables
36
37 if (5 != argc && argc != 6 )
38 {
39 cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl;
40 exit(-1);
41 }
42
43 char *filelist = argv[1]; // list of filenames
44 char *epochStr = argv[2]; // number of files per task
45 int epoch = atoi (epochStr);
46 char *gsdlhomedir = argv[3]; // location of import script
47 char *collection = argv[4]; // Greenstone collection
48 char *site = "";
49 if (argc == 6)
50 {
51 site = argv[5]; // Greenstone site
52 }
53
54 // start MPI environment
55 rc = MPI_Init(&argc,&argv);
56 if (rc != MPI_SUCCESS)
57 {
58 printf ("Error starting MPI program. Terminating.\n");
59 MPI_Abort(MPI_COMM_WORLD, rc);
60 }
61
62 // We'll handle errors ourselves
63 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
64
65 // get MPI variables: number of processors and processor number
66 MPI_Status stat;
67 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
68 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
69
70 // master node processing
71 if (rank == 0)
72 {
73 cerr << " * Master Starting" << endl;
74 char incoming[BUFFERSIZE]; // buffer for acknowledgments
75 char buffer[BUFFERSIZE]; // buffer to send tasks
76 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
77 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
78 int actualTasks = 0; // number of processors running
79
80 vector<char *> manifest_files; // Keep track of temp manifest filenames
81
82 // open file listing filenames to process
83 ifstream infile;
84 infile.open (filelist);
85 string line;
86
87 // set initial status of all processors to idle
88 cerr << " - initializing processor state" << endl;
89 for ( int j=0; j<BUFFERSIZE; j++ )
90 {
91 incoming[j] = ' ';
92 }
93
94 // scan through contents of file listing
95 int manifest_file_count = 1;
96 cerr << " - processing contents of filelist.txt" << endl;
97 while (!infile.eof ())
98 {
99 // get a filename
100 getline (infile, line);
101 if (line.length() > 0)
102 {
103 // search for idle processor
104 int dest=0;
105 int found = 0;
106 cerr << " - searching for idle processor" << endl;
107 while ((dest<(numtasks-1)) && (found == 0))
108 {
109 if (incoming[dest] == ' ')
110 {
111 found = 1;
112 }
113 else
114 {
115 dest++;
116 }
117 }
118
119 // if no idle processor, wait for one to become idle
120 if (found == 0)
121 {
122 cerr << " - waiting for processor to become idle" << endl;
123 MPI_Waitany (numtasks-1, request, &dest, status);
124 }
125
126 // construct manifest filename
127 cerr << " - creating manifest file: number " << manifest_file_count << endl;
128 stringstream manifestfilename_strstr;
129 manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
130 string manifestfilename_str = manifestfilename_strstr.str();
131 char *manifestfilename = new char [manifestfilename_str.size() + 1];
132 strcpy(manifestfilename, manifestfilename_str.c_str());
133 ofstream manifestfile;
134 if (manifestfilename != NULL)
135 {
136 // create manifest file
137 manifestfile.open (manifestfilename);
138 if (manifestfile.fail())
139 {
140 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
141 }
142 }
143 else
144 {
145 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
146 }
147 manifestfile << "<Manifest><Index>" << endl;
148
149 // add the first filename to the instruction
150 manifestfile << "<Filename>" << line << "</Filename>" << endl;
151 int epochCounter = epoch;
152
153 // if epoch>1 and more filenames, add more filenames
154 while ((epochCounter > 1) && (!infile.eof ()))
155 {
156 getline (infile, line);
157 if (line.length () > 0)
158 {
159 manifestfile << "<Filename>" << line << "</Filename>" << endl;
160 epochCounter--;
161 }
162 }
163
164 manifestfile << "</Index></Manifest>" << endl;
165 manifestfile.close ();
166
167 cerr << " - manifest file complete" << endl;
168
169 // Store manifest filepath so we can remove it later
170 manifest_files.push_back(manifestfilename);
171
172 // Send the manifest filename as the instruction
173 cerr << " - writing manifest filename as instruction to worker" << endl;
174 sprintf(buffer, "%s", manifestfilename);
175
176 // mark processors as busy
177 incoming[dest] = 'B';
178 // send out the job to the processor
179 cerr << " - asking worker to start" << endl;
180 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
181 // wait for a done acknowledgement
182 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
183 cerr << " - worker replied that it has started" << endl;
184 // update counter of actual tasks
185 if (dest > actualTasks)
186 {
187 actualTasks = dest;
188 cerr << " - increased the number of running workers to: " << actualTasks << endl;
189 }
190 // increase number of manifest files processed
191 manifest_file_count++;
192 }
193 }
194
195 infile.close();
196
197 // wait until all outstanding tasks are completed
198 cerr << " - waiting for all outstanding tasks to complete" << endl;
199 int dest;
200 for ( int k=0; k<actualTasks; k++ )
201 {
202 MPI_Waitany (actualTasks, request, &dest, status);
203 }
204
205 // send message to end all processing engines
206 cerr << " * Master asking children to exit" << endl;
207 char endstr[5] = "end";
208 for ( int i=1; i<numtasks; i++ )
209 {
210 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
211 }
212
213 // Free up manifest files
214 cerr << " - freeing shared memory used by manifest files" << endl;
215 for ( int i = 0; i < manifest_files.size(); i++)
216 {
217 free(manifest_files[i]);
218 }
219 cerr << " * Master Exiting" << endl;
220 }
221 // slave node processing
222 else
223 {
224 ///out << "Worker Starting" << endl;
225 char incoming[BUFFERSIZE];
226
227 int counter = 0;
228
229 do
230 {
231 // wait for instruction from master
232 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
233 if (resval != MPI_SUCCESS)
234 {
235 printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
236 MPI_Abort(MPI_COMM_WORLD, rc);
237 }
238 counter++;
239 if (strcmp (incoming, "end") != 0)
240 {
241 // process a received job
242 cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl;
243
244 // create Greenstone import command
245 // - incoming now contains the name of the manifest file to process
246 char command[2048];
247
248#ifdef HARDAFFINITY
249 int cpu = rank - 1;
250 cerr << "Setting affinity for worker " << rank << " to cpu " << cpu << endl;
251 if (site != "")
252 {
253 sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", cpu, gsdlhomedir, incoming, site, collection);
254 }
255 else
256 {
257 sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", cpu, gsdlhomedir, incoming, collection);
258 }
259#else
260 if (site != "")
261 {
262 sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection);
263 }
264 else
265 {
266 //sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", gsdlhomedir, incoming, collection);
267 sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s > %s/collect/%s/logs/import-p%d-%d.log 2>&1", gsdlhomedir, incoming, collection, gsdlhomedir, collection, rank, counter);
268 }
269#endif
270
271 cerr << "**** cmd = " << command << endl;
272
273 // invoke Greenstone import with manifest file
274 system (command);
275 cerr << "**** complete" << endl;
276 char line = ' ';
277 // send completed message
278 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
279 }
280 }
281 while (strcmp (incoming, "end") != 0);
282 // stop when "end" instruction is received
283 cerr << "Worker Exiting" << endl;
284 }
285
286 // clean up MPI environment
287 cerr << "Finalizing..." << endl;
288 MPI_Finalize();
289}
Note: See TracBrowser for help on using the repository browser.