root/gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp @ 25839

Revision 25839, 10.6 KB (checked in by jmt12, 8 years ago)

Adding more timing to threads

  • Property svn:executable set to *
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid (my) understanding.
9//         - made site argument optional to support GS2.
10//         - moving manifest writing code into the rank=0 thread. This will
11//           remove the artificial limit on epoc size caused by size of
12//           message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14//           ability to set up the location as a RAMDisk. [jmt12]
15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
20#include <time.h>
21
22#include <fstream>
23#include <iostream>
24#include <sstream>
25#include <string>
26#include <vector>
27
28using namespace std;
29
30//#define HARDAFFINITY 1
31#define BUFFERSIZE 512
32
33int
34main ( int argc, char *argv [] )
35{
36  int numtasks, rank, rc;            // MPI variables
37  unsigned long int seconds = 0;
38
39  if (5 != argc && argc != 6 )
40  {
41    fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n");
42    exit(-1);
43  }
44
45  char *filelist = argv[1];          // list of filenames
46  char *epochStr = argv[2];          // number of files per task
47  int epoch = atoi (epochStr);
48  char *gsdlhomedir = argv[3];      // location of import script
49  char *collection = argv[4];        // Greenstone collection
50  char *site = NULL;
51  if (argc == 6)
52  {
53    site = argv[5];              // Greenstone site
54  }
55
56  // start MPI environment
57  rc = MPI_Init(&argc,&argv);
58  if (rc != MPI_SUCCESS)
59  {
60    fprintf(stderr, "Error starting MPI program. Terminating.\n");
61    MPI_Abort(MPI_COMM_WORLD, rc);
62  }
63
64  // We'll handle errors ourselves
65  MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
66
67  // get MPI variables: number of processors and processor number
68  MPI_Status stat;
69  MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
70  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
71
72  // Get processor name too - important when it could be anywhere in a cluster
73  int name_length;
74  char processor_name[MPI_MAX_PROCESSOR_NAME];
75  MPI_Get_processor_name(processor_name, &name_length);
76
77  // master node processing
78  if (rank == 0)
79  {
80    seconds = time(NULL);
81    fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
82    char incoming[BUFFERSIZE];          // buffer for acknowledgments
83    char buffer[BUFFERSIZE];         // buffer to send tasks
84    MPI_Request request[BUFFERSIZE];    // request monitor for all tasks
85    MPI_Status status[BUFFERSIZE];      // status monitor for all tasks
86    int actualTasks = 0;         // number of processors running
87
88    vector<char *> manifest_files; // Keep track of temp manifest filenames
89
90    // open file listing filenames to process
91    ifstream infile;
92    infile.open (filelist);
93    string line;
94
95    // set initial status of all processors to idle
96    fprintf(stderr, "[M] Initializing processor state\n");
97    for ( int j=0; j<BUFFERSIZE; j++ )
98    {
99      incoming[j] = ' ';
100    }
101
102    // scan through contents of file listing
103    int manifest_file_count = 1;
104    fprintf(stderr, "[M] Processing contents of filelist.txt\n");
105    while (!infile.eof ())
106    {
107      // get a filename
108      getline (infile, line);
109
110      if ( line.length() == 0 && !infile.eof() )
111      {
112        fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist);
113      }
114
115      if (line.length() > 0)
116      {
117        // search for idle processor
118        int dest=0;
119        int found = 0;
120        fprintf(stderr, "[M] Searching for idle processor\n");
121        while ((dest<(numtasks-1)) && (found == 0))
122        {
123          if (incoming[dest] == ' ')
124          {
125            found = 1;
126          }
127          else
128          {
129            dest++;
130          }
131        }
132
133        // if no idle processor, wait for one to become idle
134        if (found == 0)
135        {
136          fprintf(stderr, "[M] Waiting for processor to become idle\n");
137          MPI_Waitany (numtasks-1, request, &dest, status);
138        }
139
140        // construct manifest filename
141        fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count);
142    stringstream manifestfilename_strstr;
143    manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
144    string manifestfilename_str = manifestfilename_strstr.str();
145    char *manifestfilename = new char [manifestfilename_str.size() + 1];
146    strcpy(manifestfilename, manifestfilename_str.c_str());
147        ofstream manifestfile;
148        if (manifestfilename != NULL)
149        {
150          // create manifest file
151          manifestfile.open (manifestfilename);
152          if (manifestfile.fail())
153          {
154            fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
155          }
156        }
157        else
158        {
159          fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename);
160        }
161        manifestfile << "<Manifest><Index>" << endl;
162
163        // add the first filename to the instruction
164        manifestfile << "<Filename>" << line << "</Filename>" << endl;
165        int epochCounter = epoch;
166
167        // if epoch>1 and more filenames, add more filenames
168        while ((epochCounter > 1) && (!infile.eof ()))
169        {
170          getline (infile, line);
171          if (line.length () > 0)
172          {
173            manifestfile << "<Filename>" << line << "</Filename>" << endl;
174            epochCounter--;
175          }
176        }
177
178        manifestfile << "</Index></Manifest>" << endl;
179        manifestfile.close ();
180
181        fprintf(stderr, "[M0] Manifest file complete\n");
182
183        // Store manifest filepath so we can remove it later
184        manifest_files.push_back(manifestfilename);
185
186        // Send the manifest filename as the instruction
187        fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n");
188        sprintf(buffer, "%s", manifestfilename);
189
190        // mark processors as busy
191        incoming[dest] = 'B';
192        // send out the job to the processor
193        fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1));
194        MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
195        // wait for a done acknowledgement
196        MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
197        fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1));
198        // update counter of actual tasks
199        if (dest > actualTasks)
200        {
201          actualTasks = dest;
202          fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks);
203        }
204    // increase number of manifest files processed
205    manifest_file_count++;
206      }
207    }
208
209    infile.close();
210
211    // wait until all outstanding tasks are completed
212    fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n");
213    int dest;
214    for ( int k=0; k<actualTasks; k++ )
215    {
216      MPI_Waitany (actualTasks, request, &dest, status);
217    }
218
219    // send message to end all processing engines
220    fprintf(stderr, "[M0] Master asking children to exit\n");
221    char endstr[5] = "end";
222    for ( int i=1; i<numtasks; i++ )
223    {
224      MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
225    }
226
227    // Free up manifest files
228    fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n");
229    for ( int i = 0; i < manifest_files.size(); i++)
230    {
231      free(manifest_files[i]);
232    }
233    seconds = time(NULL);
234    fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds);
235  }
236  // worker node processing
237  else
238  {
239    seconds = time(NULL);
240    fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
241
242    // Check to see if GSDLHOME exists in the environment (it will on multicore
243    // computer, but won't on compute nodes in a cluster). It will be NULL if
244    // source setup.bash hasn't been run (on this computer).
245    const char * gsdlhometest = getenv("GSDLHOME");
246
247    char incoming[BUFFERSIZE];
248    int counter = 0;
249    do
250    {
251      // wait for instruction from master
252      int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
253      if (resval != MPI_SUCCESS)
254      {
255        fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
256        MPI_Abort(MPI_COMM_WORLD, rc);
257      }
258      counter++;
259      if (strcmp (incoming, "end") != 0)
260      {
261        // process a received job
262        seconds = time(NULL);
263        fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
264        // create Greenstone import command
265        // - incoming now contains the name of the manifest file to process
266        char command[2048];
267#ifdef HARDAFFINITY
268        int cpu = rank - 1;
269        fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu);
270        char affinity[16];
271        sprintf(affinity, "taskset -c %d", cpu);
272#else
273        char affinity[16] = "";
274#endif
275        if (site != NULL && strlen(site) > 0)
276        {
277          if (gsdlhometest != NULL)
278          {
279            sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter);
280          }
281          else
282          {
283            sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", gsdlhomedir, affinity, incoming, site, collection, rank, counter);
284          }
285        }
286        else if (gsdlhometest != NULL)
287        {
288          sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter);
289        }
290        else
291        {
292          sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1\"", affinity, gsdlhomedir, incoming, collection, rank, counter);
293        }
294
295        fprintf(stderr, "[W%d] system('%s')\n", rank, command);
296
297        // invoke Greenstone import with manifest file
298        system (command);
299        seconds = time(NULL);
300        fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
301        char line = ' ';
302        // send completed message
303        MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
304      }
305    }
306    while (strcmp (incoming, "end") != 0);
307    // stop when "end" instruction is received
308    seconds = time(NULL);
309    fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
310  }
311  ///err << "Finalizing..." << endl;
312  MPI_Finalize();
313}
Note: See TracBrowser for help on using the browser.