Changeset 25944
- Timestamp:
- 2012-07-13T12:06:06+12:00 (11 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
gs2-extensions/parallel-building/trunk/src/src/mpidspacemediafilter-src/mpidspacemediafilter.cpp
r25810 r25944 24 24 { 25 25 int numtasks, rank, rc; // MPI variables 26 27 if (3 != argc ) 28 { 29 cerr << "Usage: " << argv[0] << " dspacehome filelist" << endl; 26 unsigned long int seconds = 0; 27 28 if (4 != argc ) 29 { 30 fprintf(stderr,"Usage: mpidspacemediafilter gsdlhome dspacehome filelist\n"); 30 31 exit(-1); 31 32 } 32 33 33 char *dspacehomedir = argv[1]; // location of import script 34 char *filelist = argv[2]; 34 char *gsdlhomedir = argv[1]; 35 char *dspacehomedir = argv[2]; // location of import script 36 char *filelist = argv[3]; 35 37 36 38 // start MPI environment … … 38 40 if (rc != MPI_SUCCESS) 39 41 { 40 printf ("Error starting MPI program. Terminating.\n");42 fprintf(stderr, "Error starting MPI program. Terminating.\n"); 41 43 MPI_Abort(MPI_COMM_WORLD, rc); 42 44 } … … 50 52 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 51 53 54 // Get processor name too - important when it could be anywhere in a cluster 55 int name_length; 56 char processor_name[MPI_MAX_PROCESSOR_NAME]; 57 MPI_Get_processor_name(processor_name, &name_length); 58 52 59 // master node processing 53 60 if (rank == 0) 54 61 { 55 cerr << " * Master Starting" << endl; 62 seconds = time(NULL); 63 fprintf(stderr, "[M:%lu] Starting on %s\n", seconds, processor_name); 56 64 char incoming[BUFFERSIZE]; // buffer for acknowledgments 57 65 char buffer[BUFFERSIZE]; // buffer to send tasks … … 66 74 67 75 // set initial status of all processors to idle 68 cerr << " - initializing processor state" << endl;76 fprintf(stderr, "[M] Initializing processor state\n"); 69 77 for ( int j=0; j<BUFFERSIZE; j++ ) 70 78 { … … 73 81 74 82 // scan through contents of file listing 75 cerr << " - processing contents of filelist.txt" << endl;83 fprintf(stderr, "[M] Processing contents of filelist.txt\n"); 76 84 while (!infile.eof ()) 77 85 { … … 83 91 int dest=0; 84 92 int found = 0; 85 cerr << " - searching for idle processor" << endl;93 fprintf(stderr, "[M] Searching for idle processor\n"); 86 94 while ((dest<(numtasks-1)) && (found == 0)) 87 95 { … … 99 107 if (found == 0) 100 108 { 101 cerr << " - waiting for processor to become idle" << endl;109 fprintf(stderr, "[M] Waiting for processor to become idle\n"); 102 110 MPI_Waitany (numtasks-1, request, &dest, status); 103 111 } … … 105 113 // No need for manifests - just send the identifier of the item 106 114 // to filter to the worker 107 cerr << " - writing dspace identifier as instruction to worker" << endl; 115 fprintf(stderr, "[M] Writing dspace identifier as instruction to worker\n"); 116 // Jiggerypokery to get around weird compiler error: cannot pass 117 // objects of non-POD type âstruct std::stringâ through â...â; call 118 // will abort at runtime 108 119 char *line = new char [line_str.size() + 1]; 109 120 strcpy(line, line_str.c_str()); 110 121 sprintf(buffer, "%s", line); 122 delete [] line; 111 123 112 124 // mark processors as busy 113 125 incoming[dest] = 'B'; 114 126 // send out the job to the processor 115 cerr << " - asking worker to start" << endl;127 fprintf(stderr, "[M] Asking worker %d to start\n", (dest + 1)); 116 128 MPI_Send (&buffer, strlen (buffer)+1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD); 117 129 // wait for a done acknowledgement 118 130 MPI_Irecv (&incoming[dest], 1, MPI_CHAR, dest+1, 1, MPI_COMM_WORLD, &request[dest]); 119 cerr << " - worker replied that it has started" << endl;131 fprintf(stderr, "[M] Worker %d replied that it has started\n", (dest + 1)); 120 132 // update counter of actual tasks 121 133 if (dest > actualTasks) 122 134 { 123 135 actualTasks = dest; 124 cerr << " - increased the number of running workers to: " << actualTasks << endl;136 fprintf(stderr, "[M] Increased the number of running workers to: %d\n", actualTasks); 125 137 } 126 138 } … … 130 142 131 143 // wait until all outstanding tasks are completed 132 cerr << " - waiting for all outstanding tasks to complete" << endl;144 fprintf(stderr, "[M] Waiting for all outstanding tasks to complete\n"); 133 145 int dest; 134 146 for ( int k=0; k<actualTasks; k++ ) … … 138 150 139 151 // send message to end all processing engines 140 cerr << " * Master asking children to exit" << endl;152 fprintf(stderr,"[M] Master asking children to exit\n"); 141 153 char endstr[5] = "end"; 142 154 for ( int i=1; i<numtasks; i++ ) … … 145 157 } 146 158 159 seconds = time(NULL); 160 fprintf(stderr, "[M:%lu] Master will exit when workers complete\n", seconds); 147 161 } 148 162 // slave node processing 149 163 else 150 164 { 151 ///out << "Worker Starting" << endl; 165 seconds = time(NULL); 166 fprintf(stderr, "[W%d:%lu] Starting on %s\n", rank, seconds, processor_name); 167 168 // Check to see if GSDLHOME exists in the environment (it will on multicore 169 // computer, but won't on compute nodes in a cluster). It will be NULL if 170 // source setup.bash hasn't been run (on this computer). 171 const char * gsdlhometest = getenv("GSDLHOME"); 172 152 173 char incoming[BUFFERSIZE]; 153 154 174 int counter = 0; 155 156 175 do 157 176 { … … 160 179 if (resval != MPI_SUCCESS) 161 180 { 162 printf ("Error when recieving message from master [%d]... Terminating.\n", resval);181 fprintf(stderr, "[W%d] Error when recieving message from master... terminating (%d).\n", rank, resval); 163 182 MPI_Abort(MPI_COMM_WORLD, rc); 164 183 } … … 167 186 { 168 187 // process a received job 169 cerr << "Worker Processing [" << rank << ":" << incoming << "]" << endl; 188 seconds = time(NULL); 189 fprintf(stderr, "[W%d:%lu] Processing: %s\n", rank, seconds, incoming, counter); 170 190 171 191 // create DSpace filter-media command 172 192 // - incoming now contains the identifier of the item to filter 173 193 char command[2048]; 174 175 sprintf (command, "%s/bin/dspace filter-media -f -i \"%s\" > /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter); 176 177 cerr << "**** cmd = " << command << endl; 178 194 if (gsdlhometest != NULL) 195 { 196 sprintf (command, "%s/bin/dspace filter-media -f -i \"%s\" > /tmp/dspace_media_filter-P%d-C%d.log 2>&1", dspacehomedir, incoming, rank, counter); 197 } 198 else 199 { 200 sprintf (command, "bash -c \"cd %s && source setup.bash > /dev/null && %s/bin/dspace filter-media -f -i %s > /tmp/dspace_media_filter-P%d-C%d.log 2>&1\"", gsdlhomedir, dspacehomedir, incoming, rank, counter); 201 } 202 fprintf(stderr, "[W%d] system('%s')\n", rank, command); 179 203 // invoke dspace 180 204 system (command); 181 cerr << "**** complete" << endl; 205 206 // send completed message 182 207 char line = ' '; 183 // send completed message184 208 MPI_Send (&line, 1, MPI_CHAR, 0, 1, MPI_COMM_WORLD); 209 seconds = time(NULL); 210 fprintf(stderr, "[W%d:%lu] Process complete\n", rank, seconds); 185 211 } 186 212 } 187 213 while (strcmp (incoming, "end") != 0); 188 214 // stop when "end" instruction is received 189 cerr << "Worker Exiting" << endl; 215 seconds = time(NULL); 216 fprintf(stderr, "[W%d:%lu] Worker exiting\n", rank, seconds); 190 217 } 191 218 192 219 // clean up MPI environment 193 cerr << "Finalizing..." << endl; 220 if (rank == 0) 221 { 222 fprintf(stderr,"[M] Finalizing...\n"); 223 } 224 else 225 { 226 fprintf(stderr,"[W%d] Finalizing...\n", rank); 227 } 194 228 MPI_Finalize(); 195 229 }
Note:
See TracChangeset
for help on using the changeset viewer.