source: gs2-extensions/parallel-building/trunk/src/src/mpiterrierfileindexer-src/mpiterrierfileindexer.cpp@ 26069

Last change on this file since 26069 was 26069, checked in by jmt12, 12 years ago

Initial checkin of Terrier parallel processing code

  • Property svn:executable set to *
File size: 7.0 KB
Line 
1// Master-worker program to prepare, parallel index, and then merge a terrier
2// based collection
3//
4// John Thompson
5// 3rd August 2012
6
7#include "mpi.h"
8
9#include <stdio.h>
10#include <stdlib.h>
11
12#include <fstream>
13#include <iostream>
14#include <sstream>
15#include <string>
16#include <vector>
17
18using namespace std;
19
20#define BUFFERSIZE 512
21
22int
23main ( int argc, char *argv [] )
24{
25 int numtasks, rank, rc; // MPI variables
26 unsigned long int seconds = 0;
27
28 if (4 != argc )
29 {
30 fprintf(stderr,"Usage: mpiterrierfileindexer <path to gsdl> <path to terrier> <number of manifest files>\n");
31 exit(-1);
32 }
33
34 char *gsdlhome_dir = argv[1];
35 char *terrier_dir = argv[2];
36 int total_number_of_manifest_files = atoi(argv[3]);
37
38 // start MPI environment
39 rc = MPI_Init(&argc,&argv);
40 if (rc != MPI_SUCCESS)
41 {
42 fprintf(stderr, "Error starting MPI program. Terminating.\n");
43 MPI_Abort(MPI_COMM_WORLD, rc);
44 }
45
46 // We'll handle errors ourselves
47 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
48
49 // get MPI variables: number of processors and processor number
50 MPI_Status stat;
51 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
52 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
53
54 // Get processor name too - important when it could be anywhere in a cluster
55 int name_length;
56 char processor_name[MPI_MAX_PROCESSOR_NAME];
57 MPI_Get_processor_name(processor_name, &name_length);
58
59 // master node processing
60 if (rank == 0)
61 {
62 seconds = time(NULL);
63 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
64 char incoming[BUFFERSIZE]; // buffer for acknowledgments
65 char buffer[BUFFERSIZE]; // buffer to send tasks
66 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
67 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
68 int actualTasks = 0; // number of processors running
69 int manifest_counter = 0;
70
71 // set initial status of all processors to idle
72 fprintf(stderr, "[M] Initializing processor state\n");
73 for ( int j=0; j<BUFFERSIZE; j++ )
74 {
75 incoming[j] = ' ';
76 }
77
78 // scan through contents of file listing
79 fprintf(stderr, "[M] Running work process for each manifest file\n");
80 while (manifest_counter < total_number_of_manifest_files)
81 {
82 // search for idle processor
83 int dest=0;
84 int found = 0;
85 fprintf(stderr, "[M] Searching for idle processor\n");
86 while ((dest<(numtasks-1)) && (found == 0))
87 {
88 if (incoming[dest] == ' ')
89 {
90 found = 1;
91 }
92 else
93 {
94 dest++;
95 }
96 }
97
98 // if no idle processor, wait for one to become idle
99 if (found == 0)
100 {
101 fprintf(stderr, "[M] Waiting for processor to become idle\n");
102 MPI_Waitany (numtasks-1, request, &dest, status);
103 }
104
105 // Send manifest number (as 0 padded 3 digit string) to the worker
106 fprintf(stderr, "[M] Writing manifest number as instruction to worker\n");
107 // Jiggerypokery to get around weird compiler error: cannot pass
108 // objects of non-POD type ‘struct std::string’ through ‘...’; call
109 // will abort at runtime
110 sprintf(buffer, "%03d", manifest_counter);
111
112 // mark processors as busy
113 incoming[dest] = 'B';
114 // send out the job to the processor
115 fprintf(stderr, "[M] Asking worker %d to start\n", (dest + 1));
116 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
117 // wait for a done acknowledgement
118 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
119 fprintf(stderr, "[M] Worker %d replied that it has started\n", (dest + 1));
120 // update counter of actual tasks
121 if (dest > actualTasks)
122 {
123 actualTasks = dest;
124 fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks);
125 }
126
127 // onto the next manifest file
128 manifest_counter++;
129 }
130
131 // wait until all outstanding tasks are completed
132 fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n");
133 int dest;
134 for ( int k=0; k<actualTasks; k++ )
135 {
136 MPI_Waitany (actualTasks, request, &dest, status);
137 }
138
139 // send message to end all processing engines
140 fprintf(stderr,"[M] Master asking children to exit\n");
141 char endstr[5] = "end";
142 for ( int i=1; i<numtasks; i++ )
143 {
144 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
145 }
146
147 seconds = time(NULL);
148 fprintf(stderr, "[M:%lu] Master will exit when workers complete\n", seconds);
149 }
150
151 // worker node processing
152 else
153 {
154 seconds = time(NULL);
155 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
156
157 // Check to see if GSDLHOME exists in the environment (it will on multicore
158 // computer, but won't on compute nodes in a cluster). It will be NULL if
159 // source setup.bash hasn't been run (on this computer).
160 const char * gsdlhometest = getenv("GSDLHOME");
161
162 char incoming[BUFFERSIZE];
163 int counter = 0;
164 do
165 {
166 // wait for instruction from master
167 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
168 if (resval != MPI_SUCCESS)
169 {
170 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
171 MPI_Abort(MPI_COMM_WORLD, rc);
172 }
173 counter++;
174 if (strcmp (incoming, "end") != 0)
175 {
176 // process a received job
177 seconds = time(NULL);
178 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
179
180 // create DSpace filter-media command
181 // - incoming now contains the identifier of the item to filter
182 char command[2048];
183 if (gsdlhometest != NULL)
184 {
185 sprintf (command, "%s/bin/anyclass.sh org.terrier.applications.FileIndexer -index -path %s/var/manifest-%s.spec -prefix %s > /tmp/terrier-index-P%d-C%d.log 2>&1", terrier_dir, terrier_dir, incoming, incoming, rank, counter);
186 }
187 else
188 {
189 sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/anyclass.sh org.terrier.applications.FileIndexer -index -path %s/var/manifest-%s.spec -prefix %s > /tmp/terrier-index-P%d-C%d.log 2>&1\"", gsdlhome_dir, terrier_dir, terrier_dir, incoming, incoming, rank, counter);
190 }
191 fprintf(stderr, "[W%d] system('%s')\n", rank, command);
192 // invoke dspace
193 system(command);
194
195 // send completed message
196 char line = ' ';
197 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
198 seconds = time(NULL);
199 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
200 }
201 }
202 while (strcmp (incoming, "end") != 0);
203 // stop when "end" instruction is received
204 seconds = time(NULL);
205 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds);
206 }
207
208 // clean up MPI environment
209 if (rank == 0)
210 {
211 fprintf(stderr,"[M] Finalizing...\n");
212 }
213 else
214 {
215 fprintf(stderr,"[W%d] Finalizing...\n", rank);
216 }
217 MPI_Finalize();
218}
Note: See TracBrowser for help on using the repository browser.