source: gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp@ 24699

Last change on this file since 24699 was 24699, checked in by jmt12, 13 years ago

Write the manifest files to the collection's tmp directory so we can play with IO and RAMDisks

  • Property svn:executable set to *
File size: 7.4 KB
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// Hussein Suleman
6// 1 July 2010
7
8// 2011MAR - added a bunch of commands to aid (my) understanding.
9// - made site argument optional to support GS2.
10// - moving manifest writing code into the rank=0 thread. This will
11// remove the artificial limit on epoc size caused by size of
12// message buffer between controller and child threads. [jmt12]
13// 2011SEP - changing where the manifest files are written so I have the
14// ability to set up the location as a RAMDisk. [jmt12]
15
16#include "mpi.h"
17
18#include <stdio.h>
19#include <stdlib.h>
20
21#include <fstream>
22#include <iostream>
23#include <sstream>
24#include <string>
25#include <vector>
26
27using namespace std;
28
29#define BUFFERSIZE 512
30
31int
32main ( int argc, char *argv [] )
33{
34 int numtasks, rank, rc; // MPI variables
35
36 if (5 != argc && argc != 6 )
37 {
38 cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl;
39 exit(-1);
40 }
41
42 char *filelist = argv[1]; // list of filenames
43 char *epochStr = argv[2]; // number of files per task
44 int epoch = atoi (epochStr);
45 char *gsdlhomedir = argv[3]; // location of import script
46 char *collection = argv[4]; // Greenstone collection
47 char *site = "";
48 if (argc == 6)
49 {
50 site = argv[5]; // Greenstone site
51 }
52
53 // start MPI environment
54 rc = MPI_Init(&argc,&argv);
55 if (rc != MPI_SUCCESS)
56 {
57 printf ("Error starting MPI program. Terminating.\n");
58 MPI_Abort(MPI_COMM_WORLD, rc);
59 }
60
61 // We'll handle errors ourselves
62 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
63
64 // get MPI variables: number of processors and processor number
65 MPI_Status stat;
66 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
67 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
68
69 // master node processing
70 if (rank == 0)
71 {
72 char incoming[BUFFERSIZE]; // buffer for acknowledgments
73 char buffer[BUFFERSIZE]; // buffer to send tasks
74 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
75 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
76 int actualTasks = 0; // number of processors running
77
78 vector<char *> manifest_files; // Keep track of temp manifest filenames
79
80 // open file listing filenames to process
81 ifstream infile;
82 infile.open (filelist);
83 string line;
84
85 // set initial status of all processors to idle
86 for ( int j=0; j<BUFFERSIZE; j++ )
87 {
88 incoming[j] = ' ';
89 }
90
91 // scan through contents of file listing
92 int manifest_file_count = 1;
93 while (!infile.eof ())
94 {
95 // get a filename
96 getline (infile, line);
97 if (line.length() > 0)
98 {
99 // search for idle processor
100 int dest=0;
101 int found = 0;
102 while ((dest<(numtasks-1)) && (found == 0))
103 {
104 if (incoming[dest] == ' ')
105 {
106 found = 1;
107 }
108 else
109 {
110 dest++;
111 }
112 }
113
114 // if no idle processor, wait for one to become idle
115 if (found == 0)
116 {
117 MPI_Waitany (numtasks-1, request, &dest, status);
118 }
119
120 // construct manifest filename
121 stringstream manifestfilename_strstr;
122 manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml";
123 string manifestfilename_str = manifestfilename_strstr.str();
124 char *manifestfilename = new char [manifestfilename_str.size() + 1];
125 strcpy(manifestfilename, manifestfilename_str.c_str());
126 ofstream manifestfile;
127 if (manifestfilename != NULL)
128 {
129 // create manifest file
130 manifestfile.open (manifestfilename);
131 if (manifestfile.fail())
132 {
133 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
134 }
135 }
136 else
137 {
138 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;
139 }
140 manifestfile << "<Manifest><Index>" << endl;
141
142 // add the first filename to the instruction
143 manifestfile << "<Filename>" << line << "</Filename>" << endl;
144 int epochCounter = epoch;
145
146 // if epoch>1 and more filenames, add more filenames
147 while ((epochCounter > 1) && (!infile.eof ()))
148 {
149 getline (infile, line);
150 if (line.length () > 0)
151 {
152 manifestfile << "<Filename>" << line << "</Filename>" << endl;
153 epochCounter--;
154 }
155 }
156
157 manifestfile << "</Index></Manifest>" << endl;
158 manifestfile.close ();
159
160 // Store manifest filepath so we can remove it later
161 manifest_files.push_back(manifestfilename);
162
163 // Send the manifest filename as the instruction
164 sprintf(buffer, "%s", manifestfilename);
165
166 // mark processors as busy
167 incoming[dest] = 'B';
168 // send out the job to the processor
169 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
170 // wait for a done acknowledgement
171 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
172 // update counter of actual tasks
173 if (dest > actualTasks)
174 {
175 actualTasks = dest;
176 }
177 // increase number of manifest files processed
178 manifest_file_count++;
179 }
180 }
181
182 infile.close();
183
184 // wait until all outstanding tasks are completed
185 int dest;
186 for ( int k=0; k<actualTasks; k++ )
187 {
188 MPI_Waitany (actualTasks, request, &dest, status);
189 }
190
191 // send message to end all processing engines
192 char endstr[5] = "end";
193 for ( int i=1; i<numtasks; i++ )
194 {
195 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
196 }
197
198 // Free up manifest files
199 for ( int i = 0; i < manifest_files.size(); i++)
200 {
201 free(manifest_files[i]);
202 }
203
204 }
205 // slave node processing
206 else
207 {
208 char incoming[BUFFERSIZE];
209
210 do
211 {
212 // wait for instruction from master
213 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
214 if (resval != MPI_SUCCESS)
215 {
216 printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
217 MPI_Abort(MPI_COMM_WORLD, rc);
218 }
219 if (strcmp (incoming, "end") != 0)
220 {
221 // process a received job
222 cout << "Processing [" << rank << ":" << incoming << "]" << endl;
223
224 // create Greenstone import command
225 // - incoming now contains the name of the manifest file to process
226 char command[2048];
227 if (site != "")
228 {
229 sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection);
230 }
231 else
232 {
233 sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s", gsdlhomedir, incoming, collection);
234 }
235 cout << "**** cmd = " << command << endl;
236
237 // invoke Greenstone import with manifest file
238 system (command);
239
240 char line = ' ';
241 // send completed message
242 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
243 }
244 }
245 while (strcmp (incoming, "end") != 0);
246 // stop when "end" instruction is received
247 }
248
249 // clean up MPI environment
250 MPI_Finalize();
251}
Note: See TracBrowser for help on using the repository browser.