root/gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp @ 24833

Revision 24833, 9.3 KB (checked in by jmt12, 9 years ago)

Adding support for setting affinity of worker processes. Lots of debug statements (due to 1M doc parallel build issue)

  • Property svn:executable set to *
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid (my) understanding.
9//         - made site argument optional to support GS2.
10//         - moving manifest writing code into the rank=0 thread. This will
11//           remove the artificial limit on epoc size caused by size of
12//           message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14//           ability to set up the location as a RAMDisk. [jmt12]
15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
20
21#include <fstream>
22#include <iostream>
23#include <sstream>
24#include <string>
25#include <vector>
26
27using namespace std;
28
29//#define HARDAFFINITY 1
30#define BUFFERSIZE 512
31
32int
33main ( int argc, char *argv [] )
34{
35  int numtasks, rank, rc;            // MPI variables
36
37  if (5 != argc && argc != 6 )
38  {
39    cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl;
40    exit(-1);
41  }
42
43  char *filelist = argv[1];          // list of filenames
44  char *epochStr = argv[2];          // number of files per task
45  int epoch = atoi (epochStr);
46  char *gsdlhomedir = argv[3];      // location of import script
47  char *collection = argv[4];        // Greenstone collection
48  char *site = "";
49  if (argc == 6)
50  {
51    site = argv[5];              // Greenstone site
52  }
53
54  // start MPI environment
55  rc = MPI_Init(&argc,&argv);
56  if (rc != MPI_SUCCESS)
57  {
58    printf ("Error starting MPI program. Terminating.\n");
59    MPI_Abort(MPI_COMM_WORLD, rc);
60  }
61
62  // We'll handle errors ourselves
63  MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
64
65  // get MPI variables: number of processors and processor number
66  MPI_Status stat;
67  MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
68  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
69
70  // master node processing
71  if (rank == 0)
72  {
73    cerr << " * Master Starting" << endl;
74    char incoming[BUFFERSIZE];          // buffer for acknowledgments
75    char buffer[BUFFERSIZE];         // buffer to send tasks
76    MPI_Request request[BUFFERSIZE];    // request monitor for all tasks
77    MPI_Status status[BUFFERSIZE];      // status monitor for all tasks
78    int actualTasks = 0;         // number of processors running
79
80    vector<char *> manifest_files; // Keep track of temp manifest filenames
81
82    // open file listing filenames to process
83    ifstream infile;
84    infile.open (filelist);
85    string line;
86
87    // set initial status of all processors to idle
88    cerr << " - initializing processor state" << endl;
89    for ( int j=0; j<BUFFERSIZE; j++ )
90    {
91      incoming[j] = ' ';
92    }
93
94    // scan through contents of file listing
95    int manifest_file_count = 1;
96    cerr << " - processing contents of filelist.txt" << endl;
97    while (!infile.eof ())
98    {
99      // get a filename
100      getline (infile, line);
101      if (line.length() > 0)
102      {
103        // search for idle processor
104        int dest=0;
105        int found = 0;
106        cerr << " - searching for idle processor" << endl;
107        while ((dest<(numtasks-1)) && (found == 0))
108        {
109          if (incoming[dest] == ' ')
110          {
111            found = 1;
112          }
113          else
114          {
115            dest++;
116          }
117        }
118
119        // if no idle processor, wait for one to become idle
120        if (found == 0)
121        {
122          cerr << " - waiting for processor to become idle" << endl;
123          MPI_Waitany (numtasks-1, request, &dest, status);
124        }
125
126        // construct manifest filename
127        cerr << " - creating manifest file: number " << manifest_file_count << endl;
128    stringstream manifestfilename_strstr;
129    manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
130    string manifestfilename_str = manifestfilename_strstr.str();
131    char *manifestfilename = new char [manifestfilename_str.size() + 1];
132    strcpy(manifestfilename, manifestfilename_str.c_str());
133        ofstream manifestfile;
134        if (manifestfilename != NULL)
135        {
136          // create manifest file
137          manifestfile.open (manifestfilename);
138          if (manifestfile.fail())
139          {
140            cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
141          }
142        }
143        else
144        {
145          cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
146        }
147        manifestfile << "<Manifest><Index>" << endl;
148
149        // add the first filename to the instruction
150        manifestfile << "<Filename>" << line << "</Filename>" << endl;
151        int epochCounter = epoch;
152
153        // if epoch>1 and more filenames, add more filenames
154        while ((epochCounter > 1) && (!infile.eof ()))
155        {
156          getline (infile, line);
157          if (line.length () > 0)
158          {
159            manifestfile << "<Filename>" << line << "</Filename>" << endl;
160            epochCounter--;
161          }
162        }
163
164        manifestfile << "</Index></Manifest>" << endl;
165        manifestfile.close ();
166
167        cerr << " - manifest file complete" << endl;
168
169        // Store manifest filepath so we can remove it later
170        manifest_files.push_back(manifestfilename);
171
172        // Send the manifest filename as the instruction
173        cerr << " - writing manifest filename as instruction to worker" << endl;
174        sprintf(buffer, "%s", manifestfilename);
175
176        // mark processors as busy
177        incoming[dest] = 'B';
178        // send out the job to the processor
179        cerr << " - asking worker to start" << endl;
180        MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
181        // wait for a done acknowledgement
182        MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
183        cerr << " - worker replied that it has started" << endl;
184        // update counter of actual tasks
185        if (dest > actualTasks)
186        {
187          actualTasks = dest;
188          cerr << " - increased the number of running workers to: " << actualTasks << endl;
189        }
190    // increase number of manifest files processed
191    manifest_file_count++;
192      }
193    }
194
195    infile.close();
196
197    // wait until all outstanding tasks are completed
198    cerr << " - waiting for all outstanding tasks to complete" << endl;
199    int dest;
200    for ( int k=0; k<actualTasks; k++ )
201    {
202      MPI_Waitany (actualTasks, request, &dest, status);
203    }
204
205    // send message to end all processing engines
206    cerr << " * Master asking children to exit" << endl;
207    char endstr[5] = "end";
208    for ( int i=1; i<numtasks; i++ )
209    {
210      MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
211    }
212
213    // Free up manifest files
214    cerr << " - freeing shared memory used by manifest files" << endl;
215    for ( int i = 0; i < manifest_files.size(); i++)
216    {
217      free(manifest_files[i]);
218    }
219    cerr << " * Master Exiting" << endl;
220  }
221  // slave node processing
222  else
223  {
224    ///out << "Worker Starting" << endl;
225    char incoming[BUFFERSIZE];
226
227    int counter = 0;
228
229    do
230    {
231      // wait for instruction from master
232      int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
233      if (resval != MPI_SUCCESS)
234      {
235        printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
236        MPI_Abort(MPI_COMM_WORLD, rc);
237      }
238      counter++;
239      if (strcmp (incoming, "end") != 0)
240      {
241        // process a received job
242        cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl;
243
244        // create Greenstone import command
245        // - incoming now contains the name of the manifest file to process
246        char command[2048];
247
248#ifdef HARDAFFINITY
249        int cpu = rank - 1;
250        cerr << "Setting affinity for worker " << rank << " to cpu " << cpu << endl;
251        if (site != "")
252        {
253          sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", cpu, gsdlhomedir, incoming, site, collection);
254        }
255        else
256        {
257          sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", cpu, gsdlhomedir, incoming, collection);
258        }
259#else
260        if (site != "")
261        {
262          sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection);
263        }
264        else
265        {
266          //sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", gsdlhomedir, incoming, collection);
267          sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s > %s/collect/%s/logs/import-p%d-%d.log 2>&1", gsdlhomedir, incoming, collection, gsdlhomedir, collection, rank, counter);
268        }
269#endif
270
271        cerr << "**** cmd = " << command << endl;
272
273        // invoke Greenstone import with manifest file
274        system (command);
275        cerr << "**** complete" << endl;
276        char line = ' ';
277        // send completed message
278        MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
279      }
280    }
281    while (strcmp (incoming, "end") != 0);
282    // stop when "end" instruction is received
283    cerr << "Worker Exiting" << endl;
284  }
285
286  // clean up MPI environment
287  cerr << "Finalizing..." << endl;
288  MPI_Finalize();
289}
Note: See TracBrowser for help on using the browser.