source: gs2-extensions/parallel-building/trunk/src/src/mpidspacemediafilter-src/mpidspacemediafilter.cpp@ 25944

Last change on this file since 25944 was 25944, checked in by jmt12, 12 years ago

More debug comments, fixed memory leak - still hanging unfortunately

  • Property svn:executable set to *
File size: 7.1 KB
Line 
1// Master-worker program to read in a list of files and invoke
2// dspace filter-media on each separately using OpenMPI
3//
4// John Thompson
5// 8 June 2012
6
7#include "mpi.h"
8
9#include <stdio.h>
10#include <stdlib.h>
11
12#include <fstream>
13#include <iostream>
14#include <sstream>
15#include <string>
16#include <vector>
17
18using namespace std;
19
20#define BUFFERSIZE 512
21
22int
23main ( int argc, char *argv [] )
24{
25 int numtasks, rank, rc; // MPI variables
26 unsigned long int seconds = 0;
27
28 if (4 != argc )
29 {
30 fprintf(stderr,"Usage: mpidspacemediafilter gsdlhome dspacehome filelist\n");
31 exit(-1);
32 }
33
34 char *gsdlhomedir = argv[1];
35 char *dspacehomedir = argv[2]; // location of import script
36 char *filelist = argv[3];
37
38 // start MPI environment
39 rc = MPI_Init(&argc,&argv);
40 if (rc != MPI_SUCCESS)
41 {
42 fprintf(stderr, "Error starting MPI program. Terminating.\n");
43 MPI_Abort(MPI_COMM_WORLD, rc);
44 }
45
46 // We'll handle errors ourselves
47 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
48
49 // get MPI variables: number of processors and processor number
50 MPI_Status stat;
51 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
52 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
53
54 // Get processor name too - important when it could be anywhere in a cluster
55 int name_length;
56 char processor_name[MPI_MAX_PROCESSOR_NAME];
57 MPI_Get_processor_name(processor_name, &name_length);
58
59 // master node processing
60 if (rank == 0)
61 {
62 seconds = time(NULL);
63 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
64 char incoming[BUFFERSIZE]; // buffer for acknowledgments
65 char buffer[BUFFERSIZE]; // buffer to send tasks
66 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
67 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
68 int actualTasks = 0; // number of processors running
69
70 // open file listing filenames to process
71 ifstream infile;
72 infile.open (filelist);
73 string line_str;
74
75 // set initial status of all processors to idle
76 fprintf(stderr, "[M] Initializing processor state\n");
77 for ( int j=0; j<BUFFERSIZE; j++ )
78 {
79 incoming[j] = ' ';
80 }
81
82 // scan through contents of file listing
83 fprintf(stderr, "[M] Processing contents of filelist.txt\n");
84 while (!infile.eof ())
85 {
86 // get a filename
87 getline (infile, line_str);
88 if (line_str.length() > 0)
89 {
90 // search for idle processor
91 int dest=0;
92 int found = 0;
93 fprintf(stderr, "[M] Searching for idle processor\n");
94 while ((dest<(numtasks-1)) && (found == 0))
95 {
96 if (incoming[dest] == ' ')
97 {
98 found = 1;
99 }
100 else
101 {
102 dest++;
103 }
104 }
105
106 // if no idle processor, wait for one to become idle
107 if (found == 0)
108 {
109 fprintf(stderr, "[M] Waiting for processor to become idle\n");
110 MPI_Waitany (numtasks-1, request, &dest, status);
111 }
112
113 // No need for manifests - just send the identifier of the item
114 // to filter to the worker
115 fprintf(stderr, "[M] Writing dspace identifier as instruction to worker\n");
116 // Jiggerypokery to get around weird compiler error: cannot pass
117 // objects of non-POD type ‘struct std::string’ through ‘...’; call
118 // will abort at runtime
119 char *line = new char [line_str.size() + 1];
120 strcpy(line, line_str.c_str());
121 sprintf(buffer, "%s", line);
122 delete [] line;
123
124 // mark processors as busy
125 incoming[dest] = 'B';
126 // send out the job to the processor
127 fprintf(stderr, "[M] Asking worker %d to start\n", (dest + 1));
128 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
129 // wait for a done acknowledgement
130 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
131 fprintf(stderr, "[M] Worker %d replied that it has started\n", (dest + 1));
132 // update counter of actual tasks
133 if (dest > actualTasks)
134 {
135 actualTasks = dest;
136 fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks);
137 }
138 }
139 }
140
141 infile.close();
142
143 // wait until all outstanding tasks are completed
144 fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n");
145 int dest;
146 for ( int k=0; k<actualTasks; k++ )
147 {
148 MPI_Waitany (actualTasks, request, &dest, status);
149 }
150
151 // send message to end all processing engines
152 fprintf(stderr,"[M] Master asking children to exit\n");
153 char endstr[5] = "end";
154 for ( int i=1; i<numtasks; i++ )
155 {
156 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
157 }
158
159 seconds = time(NULL);
160 fprintf(stderr, "[M:%lu] Master will exit when workers complete\n", seconds);
161 }
162 // slave node processing
163 else
164 {
165 seconds = time(NULL);
166 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
167
168 // Check to see if GSDLHOME exists in the environment (it will on multicore
169 // computer, but won't on compute nodes in a cluster). It will be NULL if
170 // source setup.bash hasn't been run (on this computer).
171 const char * gsdlhometest = getenv("GSDLHOME");
172
173 char incoming[BUFFERSIZE];
174 int counter = 0;
175 do
176 {
177 // wait for instruction from master
178 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
179 if (resval != MPI_SUCCESS)
180 {
181 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
182 MPI_Abort(MPI_COMM_WORLD, rc);
183 }
184 counter++;
185 if (strcmp (incoming, "end") != 0)
186 {
187 // process a received job
188 seconds = time(NULL);
189 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
190
191 // create DSpace filter-media command
192 // - incoming now contains the identifier of the item to filter
193 char command[2048];
194 if (gsdlhometest != NULL)
195 {
196 sprintf (command, "%s/bin/dspace filter-media -f -i \"%s\" > /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter);
197 }
198 else
199 {
200 sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/dspace filter-media -f -i %s > /tmp/dspace_media_filter-P%d-C%d.log 2>&1\"", gsdlhomedir, dspacehomedir, incoming, rank, counter);
201 }
202 fprintf(stderr, "[W%d] system('%s')\n", rank, command);
203 // invoke dspace
204 system (command);
205
206 // send completed message
207 char line = ' ';
208 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
209 seconds = time(NULL);
210 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
211 }
212 }
213 while (strcmp (incoming, "end") != 0);
214 // stop when "end" instruction is received
215 seconds = time(NULL);
216 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
217 }
218
219 // clean up MPI environment
220 if (rank == 0)
221 {
222 fprintf(stderr,"[M] Finalizing...\n");
223 }
224 else
225 {
226 fprintf(stderr,"[W%d] Finalizing...\n", rank);
227 }
228 MPI_Finalize();
229}
Note: See TracBrowser for help on using the repository browser.