source: gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp@ 24589

Last change on this file since 24589 was 24589, checked in by jmt12, 13 years ago

Initial checkin

  • Property svn:executable set to *
File size: 7.0 KB
RevLine 
[24589]1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid [my] understanding. jmt12
9// - made site argument optional to support GS2. jmt12
10// - moving manifest writing code into the rank=0 thread. This will
11// remove the artificial limit on epoc size caused by size of
12// message buffer between controller and child threads. jmt12
13
14#include "mpi.h"
15
16#include <stdio.h>
17#include <stdlib.h>
18
19#include <fstream>
20#include <iostream>
21#include <string>
22#include <vector>
23
24using namespace std;
25
26#define BUFFERSIZE 512
27
28int
29main ( int argc, char *argv [] )
30{
31 int numtasks, rank, rc; // MPI variables
32
33 if (5 != argc && argc != 6 )
34 {
35 cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl;
36 exit(-1);
37 }
38
39 char *filelist = argv[1]; // list of filenames
40 char *epochStr = argv[2]; // number of files per task
41 int epoch = atoi (epochStr);
42 char *gsdlhomedir = argv[3]; // location of import script
43 char *collection = argv[4]; // Greenstone collection
44 char *site = "";
45 if (argc == 6)
46 {
47 site = argv[5]; // Greenstone site
48 }
49
50 // start MPI environment
51 rc = MPI_Init(&argc,&argv);
52 if (rc != MPI_SUCCESS)
53 {
54 printf ("Error starting MPI program. Terminating.\n");
55 MPI_Abort(MPI_COMM_WORLD, rc);
56 }
57
58 // We'll handle errors ourselves
59 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
60
61 // get MPI variables: number of processors and processor number
62 MPI_Status stat;
63 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
64 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
65
66 // master node processing
67 if (rank == 0)
68 {
69 char incoming[BUFFERSIZE]; // buffer for acknowledgments
70 char buffer[BUFFERSIZE]; // buffer to send tasks
71 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
72 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
73 int actualTasks = 0; // number of processors running
74
75 vector<char *> manifest_files; // Keep track of temp manifest filenames
76
77 // open file listing filenames to process
78 ifstream infile;
79 infile.open (filelist);
80 string line;
81
82 // set initial status of all processors to idle
83 for ( int j=0; j<BUFFERSIZE; j++ )
84 {
85 incoming[j] = ' ';
86 }
87
88 // scan through contents of file listing
89 while (!infile.eof ())
90 {
91 // get a filename
92 getline (infile, line);
93 if (line.length() > 0)
94 {
95 // search for idle processor
96 int dest=0;
97 int found = 0;
98 while ((dest<(numtasks-1)) && (found == 0))
99 {
100 if (incoming[dest] == ' ')
101 {
102 found = 1;
103 }
104 else
105 {
106 dest++;
107 }
108 }
109
110 // if no idle processor, wait for one to become idle
111 if (found == 0)
112 {
113 MPI_Waitany (numtasks-1, request, &dest, status);
114 }
115
116 // construct manifest filename
117 char manifestfile_buffer[128];
118 sprintf (manifestfile_buffer, "%u.manifest.xml", rank);
119 char* manifestfilename = tempnam(NULL,manifestfile_buffer);
120 ofstream manifestfile;
121 if (manifestfilename != NULL)
122 {
123 // create manifest file
124 manifestfile.open (manifestfilename);
125 if (manifestfile.fail())
126 {
127 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
128 }
129 }
130 else
131 {
132 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
133 }
134 manifestfile << "<Manifest><Index>" << endl;
135
136 // add the first filename to the instruction
137 manifestfile << "<Filename>" << line << "</Filename>" << endl;
138 int epochCounter = epoch;
139
140 // if epoch>1 and more filenames, add more filenames
141 while ((epochCounter > 1) && (!infile.eof ()))
142 {
143 getline (infile, line);
144 if (line.length () > 0)
145 {
146 manifestfile << "<Filename>" << line << "</Filename>" << endl;
147 epochCounter--;
148 }
149 }
150
151 manifestfile << "</Index></Manifest>" << endl;
152 manifestfile.close ();
153
154 // Store manifest filepath so we can remove it later
155 manifest_files.push_back(manifestfilename);
156
157 // Send the manifest filename as the instruction
158 sprintf(buffer, "%s", manifestfilename);
159
160 // mark processors as busy
161 incoming[dest] = 'B';
162 // send out the job to the processor
163 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
164 // wait for a done acknowledgement
165 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
166 // update counter of actual tasks
167 if (dest > actualTasks)
168 {
169 actualTasks = dest;
170 }
171 }
172 }
173
174 infile.close();
175
176 // wait until all outstanding tasks are completed
177 int dest;
178 for ( int k=0; k<actualTasks; k++ )
179 {
180 MPI_Waitany (actualTasks, request, &dest, status);
181 }
182
183 // send message to end all processing engines
184 char endstr[5] = "end";
185 for ( int i=1; i<numtasks; i++ )
186 {
187 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
188 }
189
190 // Free up manifest files
191 for ( int i = 0; i < manifest_files.size(); i++)
192 {
193 free(manifest_files[i]);
194 }
195
196 }
197 // slave node processing
198 else
199 {
200 char incoming[BUFFERSIZE];
201
202 do
203 {
204 // wait for instruction from master
205 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
206 if (resval != MPI_SUCCESS)
207 {
208 printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
209 MPI_Abort(MPI_COMM_WORLD, rc);
210 }
211 if (strcmp (incoming, "end") != 0)
212 {
213 // process a received job
214 cout << "Processing [" << rank << ":" << incoming << "]" << endl;
215
216 // create Greenstone import command
217 // - incoming now contains the name of the manifest file to process
218 char command[2048];
219 if (site != "")
220 {
221 sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection);
222 }
223 else
224 {
225 sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s", gsdlhomedir, incoming, collection);
226 }
227 cout << "**** cmd = " << command << endl;
228
229 // invoke Greenstone import with manifest file
230 system (command);
231
232 char line = ' ';
233 // send completed message
234 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
235 }
236 }
237 while (strcmp (incoming, "end") != 0);
238 // stop when "end" instruction is received
239 }
240
241 // clean up MPI environment
242 MPI_Finalize();
243}
Note: See TracBrowser for help on using the repository browser.