source: main/trunk/greenstone2/build-src/src/mpi/farmer.cpp@ 22449

Last change on this file since 22449 was 22449, checked in by davidb, 14 years ago

Supporting code for parallel_import.pl that uses Open-MPI to farm out processes in parallel

  • Property svn:executable set to *
File size: 5.7 KB
Line 
1// Master-worker program to read in a list of files and invoke
2// import on each separately using manifest files in Greenstone 3,
3// with synchronisation using OpenMPI
4//
5// hussein suleman
6// 1 july 2010
7
8#include "mpi.h"
9
10#include <stdio.h>
11#include <stdlib.h>
12
13#include <fstream>
14#include <iostream>
15#include <string>
16
17using namespace std;
18
19#define KILOBUF 512
20//#define MEGABUF 655360
21#define MEGABUF 10240
22
23int main( int argc, char *argv [] )
24{
25 int numtasks, rank, rc; // MPI variables
26
27 if (argc != 6 ) {
28 cerr << "Usage: " << argv[0] << "filelist epoch gsdlhome site collection" << endl;
29 exit(-1);
30 }
31
32 char *filelist = argv[1]; // list of filenames
33 char *epochStr = argv[2]; // number of files per task
34 int epoch = atoi (epochStr);
35 char *gsdlhomedir = argv[3]; // location of import script
36 char *site = argv[4]; // Greenstone site
37 char *collection = argv[5]; // Greenstone collection
38
39 // start MPI environment
40 rc = MPI_Init(&argc,&argv);
41 if (rc != MPI_SUCCESS) {
42 printf ("Error starting MPI program. Terminating.\n");
43 MPI_Abort(MPI_COMM_WORLD, rc);
44 }
45
46 // get MPI variables: number of processors and processor number
47 MPI_Status stat;
48 MPI_Comm_size(MPI_COMM_WORLD,&numtasks);
49 MPI_Comm_rank(MPI_COMM_WORLD,&rank);
50
51
52 if (rank == 0)
53 // master node processing
54 {
55 char incoming[KILOBUF]; // buffer for acknowledgments
56 char buffer[MEGABUF]; // buffer to send tasks
57 MPI_Request request[KILOBUF]; // request monitor for all tasks
58 MPI_Status status[KILOBUF]; // status monitor for all tasks
59 int actualTasks = 0; // number of processors running
60
61 // open file listing filenames to process
62 ifstream infile;
63 infile.open (filelist);
64 string line;
65
66 // set initial status of all processors to idle
67 for ( int j=0; j<KILOBUF; j++ )
68 incoming[j] = ' ';
69
70 // scan through contents of file listing
71 while (!infile.eof ())
72 {
73 // get a filename
74 getline (infile, line);
75 if (line.length() > 0)
76 {
77 // search for idle processor
78 int dest=0;
79 int found = 0;
80 while ((dest<(numtasks-1)) && (found == 0))
81 if (incoming[dest] == ' ')
82 found = 1;
83 else
84 dest++;
85
86 // if no idle processor, wait for one to become idle
87 if (found == 0) {
88 MPI_Waitany (numtasks-1, request, &dest, status);
89 }
90
91 // add the first filename to the instruction
92 sprintf (buffer, "<Filename>%s</Filename>", line.c_str ());
93 int epochCounter = epoch;
94
95 // if epoch>1 and more filenames, add more filenames
96 while ((epochCounter > 1) && (!infile.eof ()))
97 {
98 getline (infile, line);
99 if (line.length () > 0)
100 {
101 char buffer2[1024];
102 sprintf (buffer2, "<Filename>%s</Filename>", line.c_str ());
103 strcat (buffer, buffer2);
104 }
105 epochCounter--;
106 }
107
108 // mark processors as busy
109 incoming[dest] = 'B';
110 // send out the job to the processor
111 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
112 // wait for a done acknowledgement
113 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
114 // update counter of actual tasks
115 if (dest > actualTasks)
116 actualTasks = dest;
117 }
118 }
119
120 infile.close();
121
122 // wait until all outstanding tasks are completed
123 int dest;
124 for ( int k=0; k<actualTasks; k++ )
125 MPI_Waitany (actualTasks, request, &dest, status);
126
127 // send message to end all processing engines
128 char endstr[5] = "end";
129 for ( int i=1; i<numtasks; i++ )
130 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
131
132 }
133 else
134 // slave node processing
135 {
136 char incoming[MEGABUF];
137
138 do {
139 // wait for instruction from master
140 MPI_Recv (&incoming, MEGABUF, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
141 if (strcmp (incoming, "end") != 0)
142 {
143 // process a received job
144 cout << "Processing [" << rank << ":" << incoming << "]" << endl;
145
146 // construct manifest filename
147 char manifestfile[128];
148 sprintf (manifestfile, "%u.manifest.xml", rank);
149 char* manifestfilename = tempnam(NULL,manifestfile);
150 if (manifestfilename != NULL) {
151
152 // create manifest file
153 ofstream manifestfile;
154 manifestfile.open (manifestfilename);
155 manifestfile << "<Manifest><Index>" << incoming << "</Index></Manifest>" << endl;
156 manifestfile.close ();
157
158 // create Greenstone import command
159 char command[2048];
160 sprintf (command, "%s\\bin\\script\\import.pl -keepold -manifest %s -site %s %s", gsdlhomedir, manifestfilename, site, collection);
161
162 // cout << "**** cmd = " << command << endl;
163
164 // invoke Greenstone import with manifest file
165 system (command);
166
167 char line = ' ';
168 // send completed message
169 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
170
171 free(manifestfilename);
172 }
173 else {
174 cerr << "Error: Unable to create temporary manifest file for rank=" << rank << endl;
175 }
176 }
177 } while (strcmp (incoming, "end") != 0);
178 // stop when "end" instruction is received
179
180 }
181
182 // clean up MPI environment
183 MPI_Finalize();
184}
185
Note: See TracBrowser for help on using the repository browser.