root/gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp @ 24589

Revision 24589, 7.0 KB (checked in by jmt12, 8 years ago)

Initial checkin

  • Property svn:executable set to *
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid [my] understanding. jmt12
9//         - made site argument optional to support GS2. jmt12
10//         - moving manifest writing code into the rank=0 thread. This will
11//           remove the artificial limit on epoc size caused by size of
12//           message buffer between controller and child threads. jmt12
13
14#include "mpi.h"
15
16#include <stdio.h>
17#include <stdlib.h>
18
19#include <fstream>
20#include <iostream>
21#include <string>
22#include <vector>
23
24using namespace std;
25
26#define BUFFERSIZE 512
27
28int
29main ( int argc, char *argv [] )
30{
31  int numtasks, rank, rc;            // MPI variables
32
33  if (5 != argc && argc != 6 )
34  {
35    cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl;
36    exit(-1);
37  }
38
39  char *filelist = argv[1];          // list of filenames
40  char *epochStr = argv[2];          // number of files per task
41  int epoch = atoi (epochStr);
42  char *gsdlhomedir = argv[3];      // location of import script
43  char *collection = argv[4];        // Greenstone collection
44  char *site = "";
45  if (argc == 6)
46  {
47    site = argv[5];              // Greenstone site
48  }
49
50  // start MPI environment
51  rc = MPI_Init(&argc,&argv);
52  if (rc != MPI_SUCCESS)
53  {
54    printf ("Error starting MPI program. Terminating.\n");
55    MPI_Abort(MPI_COMM_WORLD, rc);
56  }
57
58  // We'll handle errors ourselves
59  MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
60
61  // get MPI variables: number of processors and processor number
62  MPI_Status stat;
63  MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
64  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
65
66  // master node processing
67  if (rank == 0)
68  {
69    char incoming[BUFFERSIZE];          // buffer for acknowledgments
70    char buffer[BUFFERSIZE];         // buffer to send tasks
71    MPI_Request request[BUFFERSIZE];    // request monitor for all tasks
72    MPI_Status status[BUFFERSIZE];      // status monitor for all tasks
73    int actualTasks = 0;         // number of processors running
74
75    vector<char *> manifest_files; // Keep track of temp manifest filenames
76
77    // open file listing filenames to process
78    ifstream infile;
79    infile.open (filelist);
80    string line;
81
82    // set initial status of all processors to idle
83    for ( int j=0; j<BUFFERSIZE; j++ )
84    {
85      incoming[j] = ' ';
86    }
87
88    // scan through contents of file listing
89    while (!infile.eof ())
90    {
91      // get a filename
92      getline (infile, line);
93      if (line.length() > 0)
94      {
95        // search for idle processor
96        int dest=0;
97        int found = 0;
98        while ((dest<(numtasks-1)) && (found == 0))
99        {
100          if (incoming[dest] == ' ')
101          {
102            found = 1;
103          }
104          else
105          {
106            dest++;
107          }
108        }
109
110        // if no idle processor, wait for one to become idle
111        if (found == 0)
112        {
113          MPI_Waitany (numtasks-1, request, &dest, status);
114        }
115
116        // construct manifest filename
117        char manifestfile_buffer[128];
118        sprintf (manifestfile_buffer, "%u.manifest.xml", rank);
119        char* manifestfilename = tempnam(NULL,manifestfile_buffer);
120        ofstream manifestfile;
121        if (manifestfilename != NULL)
122        {
123          // create manifest file
124          manifestfile.open (manifestfilename);
125          if (manifestfile.fail())
126          {
127            cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
128          }
129        }
130        else
131        {
132          cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
133        }
134        manifestfile << "<Manifest><Index>" << endl;
135
136        // add the first filename to the instruction
137        manifestfile << "<Filename>" << line << "</Filename>" << endl;
138        int epochCounter = epoch;
139
140        // if epoch>1 and more filenames, add more filenames
141        while ((epochCounter > 1) && (!infile.eof ()))
142        {
143          getline (infile, line);
144          if (line.length () > 0)
145          {
146            manifestfile << "<Filename>" << line << "</Filename>" << endl;
147            epochCounter--;
148          }
149        }
150
151        manifestfile << "</Index></Manifest>" << endl;
152        manifestfile.close ();
153
154        // Store manifest filepath so we can remove it later
155        manifest_files.push_back(manifestfilename);
156
157        // Send the manifest filename as the instruction
158        sprintf(buffer, "%s", manifestfilename);
159
160        // mark processors as busy
161        incoming[dest] = 'B';
162        // send out the job to the processor
163        MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
164        // wait for a done acknowledgement
165        MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
166        // update counter of actual tasks
167        if (dest > actualTasks)
168        {
169          actualTasks = dest;
170        }
171      }
172    }
173
174    infile.close();
175
176    // wait until all outstanding tasks are completed
177    int dest;
178    for ( int k=0; k<actualTasks; k++ )
179    {
180      MPI_Waitany (actualTasks, request, &dest, status);
181    }
182
183    // send message to end all processing engines
184    char endstr[5] = "end";
185    for ( int i=1; i<numtasks; i++ )
186    {
187      MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
188    }
189
190    // Free up manifest files
191    for ( int i = 0; i < manifest_files.size(); i++)
192    {
193      free(manifest_files[i]);
194    }
195
196  }
197  // slave node processing
198  else
199  {
200    char incoming[BUFFERSIZE];
201
202    do
203    {
204      // wait for instruction from master
205      int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
206      if (resval != MPI_SUCCESS)
207      {
208        printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
209        MPI_Abort(MPI_COMM_WORLD, rc);
210      }
211      if (strcmp (incoming, "end") != 0)
212      {
213        // process a received job
214        cout << "Processing [" << rank << ":" << incoming << "]" << endl;
215
216        // create Greenstone import command
217        // - incoming now contains the name of the manifest file to process
218        char command[2048];
219        if (site != "")
220        {
221          sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection);
222        }
223        else
224        {
225          sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s", gsdlhomedir, incoming, collection);
226        }
227        cout << "**** cmd = " << command << endl;
228
229        // invoke Greenstone import with manifest file
230        system (command);
231
232        char line = ' ';
233        // send completed message
234        MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
235      }
236    }
237    while (strcmp (incoming, "end") != 0);
238    // stop when "end" instruction is received
239  }
240
241  // clean up MPI environment
242  MPI_Finalize();
243}
Note: See TracBrowser for help on using the browser.