source: gs2-extensions/parallel-building/trunk/src/src/mpidspacemediafilter-src/mpidspacemediafilter.cpp@ 25810

Last change on this file since 25810 was 25810, checked in by jmt12, 12 years ago

MPI executable to provide parallel processing functionality to DSpace in a fashion similar to our Greenstone parallel processing tools

  • Property svn:executable set to *
File size: 5.5 KB
RevLine 
[25810]1// Master-worker program to read in a list of files and invoke
2// dspace filter-media on each separately using OpenMPI
3//
4// John Thompson
5// 8 June 2012
6
7#include "mpi.h"
8
9#include <stdio.h>
10#include <stdlib.h>
11
12#include <fstream>
13#include <iostream>
14#include <sstream>
15#include <string>
16#include <vector>
17
18using namespace std;
19
20#define BUFFERSIZE 512
21
22int
23main ( int argc, char *argv [] )
24{
25 int numtasks, rank, rc; // MPI variables
26
27 if (3 != argc )
28 {
29 cerr << "Usage: " << argv[0] << " dspacehome filelist" << endl;
30 exit(-1);
31 }
32
33 char *dspacehomedir = argv[1]; // location of import script
34 char *filelist = argv[2];
35
36 // start MPI environment
37 rc = MPI_Init(&argc,&argv);
38 if (rc != MPI_SUCCESS)
39 {
40 printf ("Error starting MPI program. Terminating.\n");
41 MPI_Abort(MPI_COMM_WORLD, rc);
42 }
43
44 // We'll handle errors ourselves
45 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
46
47 // get MPI variables: number of processors and processor number
48 MPI_Status stat;
49 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
50 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
51
52 // master node processing
53 if (rank == 0)
54 {
55 cerr << " * Master Starting" << endl;
56 char incoming[BUFFERSIZE]; // buffer for acknowledgments
57 char buffer[BUFFERSIZE]; // buffer to send tasks
58 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
59 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
60 int actualTasks = 0; // number of processors running
61
62 // open file listing filenames to process
63 ifstream infile;
64 infile.open (filelist);
65 string line_str;
66
67 // set initial status of all processors to idle
68 cerr << " - initializing processor state" << endl;
69 for ( int j=0; j<BUFFERSIZE; j++ )
70 {
71 incoming[j] = ' ';
72 }
73
74 // scan through contents of file listing
75 cerr << " - processing contents of filelist.txt" << endl;
76 while (!infile.eof ())
77 {
78 // get a filename
79 getline (infile, line_str);
80 if (line_str.length() > 0)
81 {
82 // search for idle processor
83 int dest=0;
84 int found = 0;
85 cerr << " - searching for idle processor" << endl;
86 while ((dest<(numtasks-1)) && (found == 0))
87 {
88 if (incoming[dest] == ' ')
89 {
90 found = 1;
91 }
92 else
93 {
94 dest++;
95 }
96 }
97
98 // if no idle processor, wait for one to become idle
99 if (found == 0)
100 {
101 cerr << " - waiting for processor to become idle" << endl;
102 MPI_Waitany (numtasks-1, request, &dest, status);
103 }
104
105 // No need for manifests - just send the identifier of the item
106 // to filter to the worker
107 cerr << " - writing dspace identifier as instruction to worker" << endl;
108 char *line = new char [line_str.size() + 1];
109 strcpy(line, line_str.c_str());
110 sprintf(buffer, "%s", line);
111
112 // mark processors as busy
113 incoming[dest] = 'B';
114 // send out the job to the processor
115 cerr << " - asking worker to start" << endl;
116 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
117 // wait for a done acknowledgement
118 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
119 cerr << " - worker replied that it has started" << endl;
120 // update counter of actual tasks
121 if (dest > actualTasks)
122 {
123 actualTasks = dest;
124 cerr << " - increased the number of running workers to: " << actualTasks << endl;
125 }
126 }
127 }
128
129 infile.close();
130
131 // wait until all outstanding tasks are completed
132 cerr << " - waiting for all outstanding tasks to complete" << endl;
133 int dest;
134 for ( int k=0; k<actualTasks; k++ )
135 {
136 MPI_Waitany (actualTasks, request, &dest, status);
137 }
138
139 // send message to end all processing engines
140 cerr << " * Master asking children to exit" << endl;
141 char endstr[5] = "end";
142 for ( int i=1; i<numtasks; i++ )
143 {
144 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
145 }
146
147 }
148 // slave node processing
149 else
150 {
151 ///out << "Worker Starting" << endl;
152 char incoming[BUFFERSIZE];
153
154 int counter = 0;
155
156 do
157 {
158 // wait for instruction from master
159 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
160 if (resval != MPI_SUCCESS)
161 {
162 printf ("Error when recieving message from master [%d]... Terminating.\n", resval);
163 MPI_Abort(MPI_COMM_WORLD, rc);
164 }
165 counter++;
166 if (strcmp (incoming, "end") != 0)
167 {
168 // process a received job
169 cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl;
170
171 // create DSpace filter-media command
172 // - incoming now contains the identifier of the item to filter
173 char command[2048];
174
175 sprintf (command, "%s/bin/dspace filter-media -f -i \"%s\" > /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter);
176
177 cerr << "**** cmd = " << command << endl;
178
179 // invoke dspace
180 system (command);
181 cerr << "**** complete" << endl;
182 char line = ' ';
183 // send completed message
184 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
185 }
186 }
187 while (strcmp (incoming, "end") != 0);
188 // stop when "end" instruction is received
189 cerr << "Worker Exiting" << endl;
190 }
191
192 // clean up MPI environment
193 cerr << "Finalizing..." << endl;
194 MPI_Finalize();
195}
Note: See TracBrowser for help on using the repository browser.