root/gs2-extensions/parallel-building/trunk/src/src/mpiterrierfileindexer-src/mpiterrierfileindexer.cpp @ 26983

Revision 26983, 6.9 KB (checked in by jmt12, 8 years ago)

Changed the prefix in a log filename to a 'W' for worker rather than a 'P' for processor, just to tidy terminology (I guess - not sure why I made such a minor change

  • Property svn:executable set to *
Line 
1// Master-worker program to prepare, parallel index, and then merge a terrier
2// based collection
3//
4// John Thompson
5// 3rd August 2012
6
7#include "mpi.h"
8
9#include <stdio.h>
10#include <stdlib.h>
11
12#include <fstream>
13#include <iostream>
14#include <sstream>
15#include <string>
16#include <vector>
17
18using namespace std;
19
20#define BUFFERSIZE 512
21
22int
23main ( int argc, char *argv [] )
24{
25  int numtasks, rank, rc;            // MPI variables
26  unsigned long int seconds = 0;
27
28  if (4 != argc )
29  {
30    fprintf(stderr,"Usage: mpiterrierfileindexer <path to gsdl> <path to terrier> <number of manifest files>\n");
31    exit(-1);
32  }
33
34  char *gsdlhome_dir = argv[1];
35  char *terrier_dir = argv[2];
36  int total_number_of_manifest_files = atoi(argv[3]);
37
38  // start MPI environment
39  rc = MPI_Init(&argc,&argv);
40  if (rc != MPI_SUCCESS)
41  {
42    fprintf(stderr, "Error starting MPI program. Terminating.\n");
43    MPI_Abort(MPI_COMM_WORLD, rc);
44  }
45
46  // We'll handle errors ourselves
47  MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
48
49  // get MPI variables: number of processors and processor number
50  MPI_Status stat;
51  MPI_Comm_size(MPI_COMM_WORLD, &numtasks);
52  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
53
54  // Get processor name too - important when it could be anywhere in a cluster
55  int name_length;
56  char processor_name[MPI_MAX_PROCESSOR_NAME];
57  MPI_Get_processor_name(processor_name, &name_length);
58
59  // master node processing
60  if (rank == 0)
61  {
62    seconds = time(NULL);
63    fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name);
64    char incoming[BUFFERSIZE];          // buffer for acknowledgments
65    char buffer[BUFFERSIZE];         // buffer to send tasks
66    MPI_Request request[BUFFERSIZE];    // request monitor for all tasks
67    MPI_Status status[BUFFERSIZE];      // status monitor for all tasks
68    int actualTasks = 0;         // number of processors running
69    int manifest_counter = 0;
70
71    // set initial status of all processors to idle
72    fprintf(stderr, "[M] Initializing processor state\n");
73    for ( int j=0; j<BUFFERSIZE; j++ )
74    {
75      incoming[j] = ' ';
76    }
77
78    // scan through contents of file listing
79    fprintf(stderr, "[M] Running work process for each manifest file\n");
80    while (manifest_counter < total_number_of_manifest_files)
81    {
82      // search for idle processor
83      int dest=0;
84      int found = 0;
85      fprintf(stderr, "[M] Searching for idle processor\n");
86      while ((dest<(numtasks-1)) && (found == 0))
87      {
88        if (incoming[dest] == ' ')
89        {
90          found = 1;
91        }
92        else
93        {
94          dest++;
95        }
96      }
97
98      // if no idle processor, wait for one to become idle
99      if (found == 0)
100      {
101        fprintf(stderr, "[M] Waiting for processor to become idle\n");
102        MPI_Waitany (numtasks-1, request, &dest, status);
103      }
104
105      // Send manifest number (as 0 padded 3 digit string) to the worker
106      fprintf(stderr, "[M] Writing manifest number as instruction to worker\n");
107      // Jiggerypokery to get around weird compiler error: cannot pass
108      // objects of non-POD type ‘struct std::string’ through ‘...’; call
109      // will abort at runtime
110      sprintf(buffer, "%03d", manifest_counter);
111
112      // mark processors as busy
113      incoming[dest] = 'B';
114      // send out the job to the processor
115      fprintf(stderr, "[M] Asking worker %d to start\n", (dest + 1));
116      MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD);
117      // wait for a done acknowledgement
118      MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]);
119      fprintf(stderr, "[M] Worker %d replied that it has started\n", (dest + 1));
120      // update counter of actual tasks
121      if (dest > actualTasks)
122      {
123        actualTasks = dest;
124        fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks);
125      }
126
127      // onto the next manifest file
128      manifest_counter++;
129    }
130
131    // wait until all outstanding tasks are completed
132    fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n");
133    int dest;
134    for ( int k=0; k<actualTasks; k++ )
135    {
136      MPI_Waitany (actualTasks, request, &dest, status);
137    }
138
139    // send message to end all processing engines
140    fprintf(stderr,"[M] Master asking children to exit\n");
141    char endstr[5] = "end";
142    for ( int i=1; i<numtasks; i++ )
143    {
144      MPI_Send (endstr, 4, MPI_CHAR, i, 1, MPI_COMM_WORLD);
145    }
146
147    seconds = time(NULL);
148    fprintf(stderr, "[M:%lu] Exiting\n", seconds);
149  }
150
151  // worker node processing
152  else
153  {
154    seconds = time(NULL);
155    fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name);
156
157    // Check to see if GSDLHOME exists in the environment (it will on multicore
158    // computer, but won't on compute nodes in a cluster). It will be NULL if
159    // source setup.bash hasn't been run (on this computer).
160    const char * gsdlhometest = getenv("GSDLHOME");
161
162    char incoming[BUFFERSIZE];
163    int counter = 0;
164    do
165    {
166      // wait for instruction from master
167      int resval = MPI_Recv (&incoming, BUFFERSIZE, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &stat);
168      if (resval != MPI_SUCCESS)
169      {
170        fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval);
171        MPI_Abort(MPI_COMM_WORLD, rc);
172      }
173      counter++;
174      if (strcmp (incoming, "end") != 0)
175      {
176        // process a received job
177        seconds = time(NULL);
178        fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter);
179
180        // create DSpace filter-media command
181        // - incoming now contains the identifier of the item to filter
182        char command[2048];
183        if (gsdlhometest != NULL)
184        {
185          sprintf (command, "%s/bin/anyclass.sh org.terrier.applications.FileIndexer -index -path %s/var/manifest-%s.spec -prefix %s > /tmp/terrier-index-W%d-C%d.log 2>&1", terrier_dir, terrier_dir, incoming, incoming, rank, counter);
186        }
187        else
188        {
189          sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/anyclass.sh org.terrier.applications.FileIndexer -index -path %s/var/manifest-%s.spec -prefix %s > /tmp/terrier-index-W%d-C%d.log 2>&1\"", gsdlhome_dir, terrier_dir, terrier_dir, incoming, incoming, rank, counter);
190        }
191        fprintf(stderr, "[W%d] system('%s')\n", rank, command);
192        // invoke dspace
193        system(command);
194
195        // send completed message
196        char line = ' ';
197        MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD);
198        seconds = time(NULL);
199        fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds);
200      }
201    }
202    while (strcmp (incoming, "end") != 0);
203    // stop when "end" instruction is received
204    seconds = time(NULL);
205    fprintf(stderr, "[W%d:%lu] Exiting\n", rank, seconds);
206  }
207
208  // clean up MPI environment
209  if (rank == 0)
210  {
211    fprintf(stderr,"[M] Finalizing...\n");
212  }
213  else
214  {
215    fprintf(stderr,"[W%d] Finalizing...\n", rank);
216  }
217  MPI_Finalize();
218}
Note: See TracBrowser for help on using the browser.