Changeset 25839
- Timestamp:
- 2012-06-28T09:17:21+12:00 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
gs2-extensions/parallel-building/trunk/src/src/mpiimport-src/mpiimport.cpp
r24833 r25839 18 18 #include <stdio.h> 19 19 #include <stdlib.h> 20 #include <time.h> 20 21 21 22 #include <fstream> … … 34 35 { 35 36 int numtasks, rank, rc; // MPI variables 37 unsigned long int seconds = 0; 36 38 37 39 if (5 != argc && argc != 6 ) 38 40 { 39 cerr << "Usage: " << argv[0] << " filelist epoch gsdlhome collection [site]" << endl;41 fprintf(stderr, "Usage: mpiimport filelist epoch gsdlhome collection [site]\n"); 40 42 exit(-1); 41 43 } … … 46 48 char *gsdlhomedir = argv[3]; // location of import script 47 49 char *collection = argv[4]; // Greenstone collection 48 char *site = "";50 char *site = NULL; 49 51 if (argc == 6) 50 52 { … … 56 58 if (rc != MPI_SUCCESS) 57 59 { 58 printf ("Error starting MPI program. Terminating.\n");60 fprintf(stderr, "Error starting MPI program. Terminating.\n"); 59 61 MPI_Abort(MPI_COMM_WORLD, rc); 60 62 } … … 68 70 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 69 71 72 // Get processor name too - important when it could be anywhere in a cluster 73 int name_length; 74 char processor_name[MPI_MAX_PROCESSOR_NAME]; 75 MPI_Get_processor_name(processor_name, &name_length); 76 70 77 // master node processing 71 78 if (rank == 0) 72 79 { 73 cerr << " * Master Starting" << endl; 80 seconds = time(NULL); 81 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name); 74 82 char incoming[BUFFERSIZE]; // buffer for acknowledgments 75 83 char buffer[BUFFERSIZE]; // buffer to send tasks … … 86 94 87 95 // set initial status of all processors to idle 88 cerr << " - initializing processor state" << endl;96 fprintf(stderr, "[M] Initializing processor state\n"); 89 97 for ( int j=0; j<BUFFERSIZE; j++ ) 90 98 { … … 94 102 // scan through contents of file listing 95 103 int manifest_file_count = 1; 96 cerr << " - processing contents of filelist.txt" << endl;104 fprintf(stderr, "[M] Processing contents of filelist.txt\n"); 97 105 while (!infile.eof ()) 98 106 { 99 107 // get a filename 100 108 getline (infile, line); 109 110 if ( line.length() == 0 && !infile.eof() ) 111 { 112 fprintf(stderr, "[M] Warning! Read empty string from filelist: %s", filelist); 113 } 114 101 115 if (line.length() > 0) 102 116 { … … 104 118 int dest=0; 105 119 int found = 0; 106 cerr << " - searching for idle processor" << endl;120 fprintf(stderr, "[M] Searching for idle processor\n"); 107 121 while ((dest<(numtasks-1)) && (found == 0)) 108 122 { … … 120 134 if (found == 0) 121 135 { 122 cerr << " - waiting for processor to become idle" << endl;136 fprintf(stderr, "[M] Waiting for processor to become idle\n"); 123 137 MPI_Waitany (numtasks-1, request, &dest, status); 124 138 } 125 139 126 140 // construct manifest filename 127 cerr << " - creating manifest file: number " << manifest_file_count << endl;141 fprintf(stderr, "[M] Creating manifest file: %d\n", manifest_file_count); 128 142 stringstream manifestfilename_strstr; 129 143 manifestfilename_strstr << gsdlhomedir << "/collect/" << collection << "/tmp/manifest." << manifest_file_count << ".xml"; … … 138 152 if (manifestfile.fail()) 139 153 { 140 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;154 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename); 141 155 } 142 156 } 143 157 else 144 158 { 145 cerr << "Fatal Error! Failed to open temporary manifest file for writing: " << manifestfile << endl;159 fprintf(stderr, "[M] Fatal Error! Failed to open temporary manifest file for writing: %s\n", manifestfilename); 146 160 } 147 161 manifestfile << "<Manifest><Index>" << endl; … … 165 179 manifestfile.close (); 166 180 167 cerr << " - manifest file complete" << endl;181 fprintf(stderr, "[M0] Manifest file complete\n"); 168 182 169 183 // Store manifest filepath so we can remove it later … … 171 185 172 186 // Send the manifest filename as the instruction 173 cerr << " - writing manifest filename as instruction to worker" << endl;187 fprintf(stderr, "[M0] Writing manifest filename as instruction to worker\n"); 174 188 sprintf(buffer, "%s", manifestfilename); 175 189 … … 177 191 incoming[dest] = 'B'; 178 192 // send out the job to the processor 179 cerr << " - asking worker to start" << endl;193 fprintf(stderr, "[M0] Asking worker %d to start\n", (dest + 1)); 180 194 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); 181 195 // wait for a done acknowledgement 182 196 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); 183 cerr << " - worker replied that it has started" << endl;197 fprintf(stderr, "[M0] Worker %d replied that it has started\n", (dest + 1)); 184 198 // update counter of actual tasks 185 199 if (dest > actualTasks) 186 200 { 187 201 actualTasks = dest; 188 cerr << " - increased the number of running workers to: " << actualTasks << endl;202 fprintf(stderr, "[M0] Increased the number of running workers to: %d\n", actualTasks); 189 203 } 190 204 // increase number of manifest files processed … … 196 210 197 211 // wait until all outstanding tasks are completed 198 cerr << " - waiting for all outstanding tasks to complete" << endl;212 fprintf(stderr, "[M0] Waiting for all outstanding tasks to complete\n"); 199 213 int dest; 200 214 for ( int k=0; k<actualTasks; k++ ) … … 204 218 205 219 // send message to end all processing engines 206 cerr << " * Master asking children to exit" << endl;220 fprintf(stderr, "[M0] Master asking children to exit\n"); 207 221 char endstr[5] = "end"; 208 222 for ( int i=1; i<numtasks; i++ ) … … 212 226 213 227 // Free up manifest files 214 cerr << " - freeing shared memory used by manifest files" << endl;228 fprintf(stderr, "[M0] Freeing shared memory used by manifest files\n"); 215 229 for ( int i = 0; i < manifest_files.size(); i++) 216 230 { 217 231 free(manifest_files[i]); 218 232 } 219 cerr << " * Master Exiting" << endl; 220 } 221 // slave node processing 233 seconds = time(NULL); 234 fprintf(stderr, "[M0:%lu] Master will exit when workers complete\n", seconds); 235 } 236 // worker node processing 222 237 else 223 238 { 224 ///out << "Worker Starting" << endl; 239 seconds = time(NULL); 240 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name); 241 242 // Check to see if GSDLHOME exists in the environment (it will on multicore 243 // computer, but won't on compute nodes in a cluster). It will be NULL if 244 // source setup.bash hasn't been run (on this computer). 245 const char * gsdlhometest = getenv("GSDLHOME"); 246 225 247 char incoming[BUFFERSIZE]; 226 227 248 int counter = 0; 228 229 249 do 230 250 { … … 233 253 if (resval != MPI_SUCCESS) 234 254 { 235 printf ("Error when recieving message from master [%d]... Terminating.\n", resval);255 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval); 236 256 MPI_Abort(MPI_COMM_WORLD, rc); 237 257 } … … 240 260 { 241 261 // process a received job 242 cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl;243 262 seconds = time(NULL); 263 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter); 244 264 // create Greenstone import command 245 265 // - incoming now contains the name of the manifest file to process 246 266 char command[2048]; 247 248 267 #ifdef HARDAFFINITY 249 268 int cpu = rank - 1; 250 cerr << "Setting affinity for worker " << rank << " to cpu " << cpu << endl; 251 if (site != "") 252 { 253 sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", cpu, gsdlhomedir, incoming, site, collection); 269 fprintf(stderr, "[W%d] Affinity fixed to CPU %d", rank, cpu); 270 char affinity[16]; 271 sprintf(affinity, "taskset -c %d", cpu); 272 #else 273 char affinity[16] = ""; 274 #endif 275 if (site != NULL && strlen(site) > 0) 276 { 277 if (gsdlhometest != NULL) 278 { 279 sprintf(command, "%s import.pl -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, site, collection, rank, counter); 280 } 281 else 282 { 283 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -verbosity 0 -keepold -manifest %s -site %s %s > /tmp/import-W%d-%d.log 2>&1", gsdlhomedir, affinity, incoming, site, collection, rank, counter); 284 } 285 } 286 else if (gsdlhometest != NULL) 287 { 288 sprintf(command, "%s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1", affinity, incoming, collection, rank, counter); 254 289 } 255 290 else 256 291 { 257 sprintf (command, "taskset -c %d %s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", cpu, gsdlhomedir, incoming, collection); 258 } 259 #else 260 if (site != "") 261 { 262 sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s -site %s %s", gsdlhomedir, incoming, site, collection); 263 } 264 else 265 { 266 //sprintf (command, "%s/bin/script/import.pl -verbosity 0 -keepold -manifest %s %s", gsdlhomedir, incoming, collection); 267 sprintf (command, "%s/bin/script/import.pl -keepold -manifest %s %s > %s/collect/%s/logs/import-p%d-%d.log 2>&1", gsdlhomedir, incoming, collection, gsdlhomedir, collection, rank, counter); 268 } 269 #endif 270 271 cerr << "**** cmd = " << command << endl; 292 sprintf(command, "bash -c \"cd %s && source setup.bash > /dev/null && %s import.pl -keepold -manifest %s %s > /tmp/import-W%d-%d.log 2>&1\"", affinity, gsdlhomedir, incoming, collection, rank, counter); 293 } 294 295 fprintf(stderr, "[W%d] system('%s')\n", rank, command); 272 296 273 297 // invoke Greenstone import with manifest file 274 298 system (command); 275 cerr << "**** complete" << endl; 299 seconds = time(NULL); 300 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds); 276 301 char line = ' '; 277 302 // send completed message … … 281 306 while (strcmp (incoming, "end") != 0); 282 307 // stop when "end" instruction is received 283 cerr << "Worker Exiting" << endl; 284 } 285 286 // clean up MPI environment 287 cerr << "Finalizing..." << endl; 308 seconds = time(NULL); 309 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds); 310 } 311 ///err << "Finalizing..." << endl; 288 312 MPI_Finalize(); 289 313 }
Note:
See TracChangeset
for help on using the changeset viewer.