source: gs2-extensions/parallel-building/trunk/src/src/mpiterrierfileindexer-src/mpiterrierfileindexer.cpp@ 26239

Last change on this file since 26239 was 26239, checked in by jmt12, 12 years ago

Modifications to progress messages to improve extracting information from the logs in an automated fashion

  • Property svn:executable set to *
File size: 6.9 KB
Line 
1// Master-worker program to prepare, parallel index, and then merge a terrier
2// based collection
3//
4// John Thompson
5// 3rd August 2012
6
7#include "mpi.h"
8
9#include <stdio.h>
10#include <stdlib.h>
11
12#include <fstream>
13#include <iostream>
14#include <sstream>
15#include <string>
16#include <vector>
17
18using namespace std;
19
20#define BUFFERSIZE 512
21
22int
23main ( int argc, char *argv [] )
24{
25 int numtasks, rank, rc; // MPI variables
26 unsigned long int seconds = 0;
27
28 if (4 != argc )
29 {
30 fprintf(stderr,"Usage: mpiterrierfileindexer <path to gsdl> <path to terrier> <number of manifest files>\n");
31 exit(-1);
32 }
33
34 char *gsdlhome_dir = argv[1];
35 char *terrier_dir = argv[2];
36 int total_number_of_manifest_files = atoi(argv[3]);
37
38 // start MPI environment
39 rc = MPI_Init(&argc,&argv);
40 if (rc != MPI_SUCCESS)
41 {
42 fprintf(stderr, "Error starting MPI program. Terminating.\n");
43 MPI_Abort(MPI_COMM_WORLD, rc);
44 }
45
46 // We'll handle errors ourselves
47 MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
48
49 // get MPI variables: number of processors and processor number
50 MPI_Status stat;
51 MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
52 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
53
54 // Get processor name too - important when it could be anywhere in a cluster
55 int name_length;
56 char processor_name[MPI_MAX_PROCESSOR_NAME];
57 MPI_Get_processor_name(processor_name, &name_length);
58
59 // master node processing
60 if (rank == 0)
61 {
62 seconds = time(NULL);
63 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
64 char incoming[BUFFERSIZE]; // buffer for acknowledgments
65 char buffer[BUFFERSIZE]; // buffer to send tasks
66 MPI_Request request[BUFFERSIZE]; // request monitor for all tasks
67 MPI_Status status[BUFFERSIZE]; // status monitor for all tasks
68 int actualTasks = 0; // number of processors running
69 int manifest_counter = 0;
70
71 // set initial status of all processors to idle
72 fprintf(stderr, "[M] Initializing processor state\n");
73 for ( int j=0; j<BUFFERSIZE; j++ )
74 {
75 incoming[j] = ' ';
76 }
77
78 // scan through contents of file listing
79 fprintf(stderr, "[M] Running work process for each manifest file\n");
80 while (manifest_counter < total_number_of_manifest_files)
81 {
82 // search for idle processor
83 int dest=0;
84 int found = 0;
85 fprintf(stderr, "[M] Searching for idle processor\n");
86 while ((dest<(numtasks-1)) && (found == 0))
87 {
88 if (incoming[dest] == ' ')
89 {
90 found = 1;
91 }
92 else
93 {
94 dest++;
95 }
96 }
97
98 // if no idle processor, wait for one to become idle
99 if (found == 0)
100 {
101 fprintf(stderr, "[M] Waiting for processor to become idle\n");
102 MPI_Waitany (numtasks-1, request, &dest, status);
103 }
104
105 // Send manifest number (as 0 padded 3 digit string) to the worker
106 fprintf(stderr, "[M] Writing manifest number as instruction to worker\n");
107 // Jiggerypokery to get around weird compiler error: cannot pass
108 // objects of non-POD type ‘struct std::string’ through ‘...’; call
109 // will abort at runtime
110 sprintf(buffer, "%03d", manifest_counter);
111
112 // mark processors as busy
113 incoming[dest] = 'B';
114 // send out the job to the processor
115 fprintf(stderr, "[M] Asking worker %d to start\n", (dest + 1));
116 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
117 // wait for a done acknowledgement
118 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
119 fprintf(stderr, "[M] Worker %d replied that it has started\n", (dest + 1));
120 // update counter of actual tasks
121 if (dest > actualTasks)
122 {
123 actualTasks = dest;
124 fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks);
125 }
126
127 // onto the next manifest file
128 manifest_counter++;
129 }
130
131 // wait until all outstanding tasks are completed
132 fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n");
133 int dest;
134 for ( int k=0; k<actualTasks; k++ )
135 {
136 MPI_Waitany (actualTasks, request, &dest, status);
137 }
138
139 // send message to end all processing engines
140 fprintf(stderr,"[M] Master asking children to exit\n");
141 char endstr[5] = "end";
142 for ( int i=1; i<numtasks; i++ )
143 {
144 MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
145 }
146
147 seconds = time(NULL);
148 fprintf(stderr, "[M:%lu] Exiting\n", seconds);
149 }
150
151 // worker node processing
152 else
153 {
154 seconds = time(NULL);
155 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
156
157 // Check to see if GSDLHOME exists in the environment (it will on multicore
158 // computer, but won't on compute nodes in a cluster). It will be NULL if
159 // source setup.bash hasn't been run (on this computer).
160 const char * gsdlhometest = getenv("GSDLHOME");
161
162 char incoming[BUFFERSIZE];
163 int counter = 0;
164 do
165 {
166 // wait for instruction from master
167 int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
168 if (resval != MPI_SUCCESS)
169 {
170 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
171 MPI_Abort(MPI_COMM_WORLD, rc);
172 }
173 counter++;
174 if (strcmp (incoming, "end") != 0)
175 {
176 // process a received job
177 seconds = time(NULL);
178 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
179
180 // create DSpace filter-media command
181 // - incoming now contains the identifier of the item to filter
182 char command[2048];
183 if (gsdlhometest != NULL)
184 {
185 sprintf (command, "%s/bin/anyclass.sh org.terrier.applications.FileIndexer -index -path %s/var/manifest-%s.spec -prefix %s > /tmp/terrier-index-P%d-C%d.log 2>&1", terrier_dir, terrier_dir, incoming, incoming, rank, counter);
186 }
187 else
188 {
189 sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/anyclass.sh org.terrier.applications.FileIndexer -index -path %s/var/manifest-%s.spec -prefix %s > /tmp/terrier-index-P%d-C%d.log 2>&1\"", gsdlhome_dir, terrier_dir, terrier_dir, incoming, incoming, rank, counter);
190 }
191 fprintf(stderr, "[W%d] system('%s')\n", rank, command);
192 // invoke dspace
193 system(command);
194
195 // send completed message
196 char line = ' ';
197 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
198 seconds = time(NULL);
199 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
200 }
201 }
202 while (strcmp (incoming, "end") != 0);
203 // stop when "end" instruction is received
204 seconds = time(NULL);
205 fprintf(stderr, "[W%d:%lu] Exiting\n", rank, seconds);
206 }
207
208 // clean up MPI environment
209 if (rank == 0)
210 {
211 fprintf(stderr,"[M] Finalizing...\n");
212 }
213 else
214 {
215 fprintf(stderr,"[W%d] Finalizing...\n", rank);
216 }
217 MPI_Finalize();
218}
Note: See TracBrowser for help on using the repository browser.