- Timestamp:
- 2013-11-20T12:57:27+13:00 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_report.pl
r28356 r28652 27 27 } 28 28 29 use Cwd; 29 30 use Devel::Peek; 30 31 use Sort::Key::Natural qw(natsort); … … 37 38 38 39 # Configuration 39 if (!defined $ARGV[0] || !-d $ARGV[0]) 40 my $offset = 0; 41 my $dir_count = 0; 42 while (defined $ARGV[$offset]) 43 { 44 my $argument = $ARGV[$offset]; 45 if ($argument =~ /^\-(.*)/) 46 { 47 } 48 else 49 { 50 my $path = getcwd() . '/' . $argument; 51 if (-d $path) 52 { 53 &searchForHadoopLog($path); 54 $dir_count++; 55 } 56 $offset++; 57 } 58 } 59 if ($dir_count == 0) 40 60 { 41 61 die("usage: hadoop_report.pl <path to results>\n"); 42 62 } 43 my $results_path = $ARGV[0]; 44 45 # Read in hadoop.log and parse top level record 46 print ' * Reading and parsing "hadoop.log"... '; 47 my $job_record = {'host'=>'', 'job'=>'', 'start'=>0, 'end'=>0, 'cpu_time'=>0}; 48 my $hadoop_log_path = $results_path . '/hadoop.log'; 49 if (open(HLIN, '<:utf8', $hadoop_log_path)) 50 { 51 while (my $line = <HLIN>) 52 { 53 if ($line =~ /host:(.+)/) 54 { 55 $job_record->{'host'} = $1; 56 } 57 elsif ($line =~ /Running job: job_(\d+_\d+)/) 58 { 59 $job_record->{'job'} = $1; 60 } 61 elsif ($line =~ /CPU time spent \(ms\)=(\d+)/) 62 { 63 $job_record->{'cpu_time'} = $1; 64 } 65 elsif ($job_record->{'start'} == 0 && $line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/) 66 { 67 $job_record->{'start'} = timelocal($6, $5, $4, $3, ($2 - 1), $1); 68 } 69 elsif ($line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/) 70 { 71 my $end = timelocal($6, $5, $4, $3, ($2 - 1), $1); 72 if ($end > $job_record->{'end'}) 73 { 74 $job_record->{'end'} = $end; 75 } 76 } 77 } 78 close(HLIN); 79 if ($job_record->{'start'} == 0 || $job_record->{'end'} == 0) 80 { 81 die('Error! Failed to parse timing information from log: ' . $hadoop_log_path); 82 } 83 } 84 else 85 { 86 die('Error! Failed to open file for reading: ' . $hadoop_log_path); 87 } 88 print "Done!\n"; 89 90 # Read in data_locality.csv (will be matched to task logs) 91 my $data_was_local = {}; 92 my $data_locality_csv_path = $results_path . '/data_locality.csv'; 93 if (-f $data_locality_csv_path) 94 { 95 print ' * Reading and parsing "data_locality.csv"... '; 96 if (open(DLIN, '<:utf8', $data_locality_csv_path)) 97 { 98 while (my $line = <DLIN>) 99 { 100 # note that the line may begin with a taskid or just a taskno (legacy) 101 if ($line =~ /(\d+),\d,(\d)/) 102 { 103 $data_was_local->{$1} = $2; 104 } 105 } 106 close(DLIN); 107 } 108 else 109 { 110 die('Error! Failed to open file for reading: ' . $data_locality_csv_path); 63 print "\n\n================================== Complete ====================================\n\n"; 64 exit; 65 66 sub searchForHadoopLog 67 { 68 my ($dir) = @_; 69 if (-d $dir) 70 { 71 print ' * Searching for hadoop.log: ' . $dir . "\n"; 72 my $hadoop_log_path = $dir . '/hadoop.log'; 73 if (-f $hadoop_log_path) 74 { 75 &generateReport($dir); 76 } 77 # search recursively 78 if (opendir(DH, $dir)) 79 { 80 my @files = readdir(DH); 81 closedir(DH); 82 foreach my $file (@files) 83 { 84 my $path = $dir . '/' . $file; 85 # ignore files starting with . 86 if ($file =~ /^\./) 87 { 88 } 89 elsif (-d $path) 90 { 91 &searchForHadoopLog($path); 92 } 93 } 94 } 95 else 96 { 97 die('Error! Failed to open directory for reading: ' . $dir); 98 } 99 } 100 } 101 102 sub generateReport 103 { 104 my ($results_path) = @_; 105 106 # Read in hadoop.log and parse top level record 107 print ' * Reading and parsing "hadoop.log"... '; 108 my $job_record = {'host'=>'', 'job'=>'', 'start'=>0, 'end'=>0, 'cpu_time'=>0}; 109 my $hadoop_log_path = $results_path . '/hadoop.log'; 110 if (open(HLIN, '<:utf8', $hadoop_log_path)) 111 { 112 while (my $line = <HLIN>) 113 { 114 if ($line =~ /host:(.+)/) 115 { 116 $job_record->{'host'} = $1; 117 } 118 elsif ($line =~ /Running job: job_(\d+_\d+)/) 119 { 120 $job_record->{'job'} = $1; 121 } 122 elsif ($line =~ /CPU time spent \(ms\)=(\d+)/) 123 { 124 $job_record->{'cpu_time'} = $1; 125 } 126 elsif ($job_record->{'start'} == 0 && $line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/) 127 { 128 $job_record->{'start'} = timelocal($6, $5, $4, $3, ($2 - 1), $1); 129 } 130 elsif ($line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/) 131 { 132 my $end = timelocal($6, $5, $4, $3, ($2 - 1), $1); 133 if ($end > $job_record->{'end'}) 134 { 135 $job_record->{'end'} = $end; 136 } 137 } 138 } 139 close(HLIN); 140 if ($job_record->{'start'} == 0 || $job_record->{'end'} == 0) 141 { 142 die('Error! Failed to parse timing information from log: ' . $hadoop_log_path); 143 } 144 } 145 else 146 { 147 die('Error! Failed to open file for reading: ' . $hadoop_log_path); 111 148 } 112 149 print "Done!\n"; 113 } 114 else 115 { 116 print " * Data locality not available or not applicable\n"; 117 } 118 119 # Read in all task logs and parse task records 120 my $task_records; 121 print " * Locating task logs...\n"; 122 if (opendir(DH, $results_path)) 123 { 124 my @files = readdir(DH); 125 foreach my $file (sort @files) 126 { 127 if ($file =~ /import-hadoop-(\d+_\d+)_m_(\d+)_\d+\.log/) 128 { 129 my $job_no = $1; 130 my $task_no = $2; 131 my $is_data_local = 0; 132 if (defined ($data_was_local->{$task_no})) 133 { 134 $is_data_local = $data_was_local->{$task_no}; 135 } 136 my $task_record = {'host'=>'', 'cpu'=>0, 'job' => $job_no, 'task' => $task_no, 'start'=>0, 'end'=>0, 'cpu_time'=>0, 'data_locality'=>$is_data_local, 'file'=>'', 'percom'=>'NA'}; 137 print ' - Reading and parsing "' . $file . '"... '; 138 my $task_log_path = $results_path . '/' . $file; 139 my $io_time = 0; 140 if (open(TIN, '<:utf8', $task_log_path)) 141 { 142 my $io_start_time = 0; 143 while (my $line = <TIN>) 144 { 145 if ($line =~ /\[Started:(\d+(?:\.\d+)?)\]/) 150 151 # Read in data_locality.csv (will be matched to task logs) 152 my $data_was_local = {}; 153 my $data_locality_csv_path = $results_path . '/data_locality.csv'; 154 if (-f $data_locality_csv_path) 155 { 156 print ' * Reading and parsing "data_locality.csv"... '; 157 if (open(DLIN, '<:utf8', $data_locality_csv_path)) 158 { 159 while (my $line = <DLIN>) 160 { 161 # note that the line may begin with a taskid or just a taskno (legacy) 162 if ($line =~ /(\d+),\d,(\d)/) 163 { 164 $data_was_local->{$1} = $2; 165 } 166 } 167 close(DLIN); 168 } 169 else 170 { 171 die('Error! Failed to open file for reading: ' . $data_locality_csv_path); 172 } 173 print "Done!\n"; 174 } 175 else 176 { 177 print " * Data locality not available or not applicable\n"; 178 } 179 180 # Read in all task logs and parse task records 181 my $task_records; 182 print " * Locating task logs...\n"; 183 if (opendir(DH, $results_path)) 184 { 185 my @files = readdir(DH); 186 foreach my $file (sort @files) 187 { 188 if ($file =~ /import-hadoop-(\d+_\d+)_m_(\d+)_\d+\.log/) 189 { 190 my $job_no = $1; 191 my $task_no = $2; 192 my $is_data_local = 0; 193 if (defined ($data_was_local->{$task_no})) 194 { 195 $is_data_local = $data_was_local->{$task_no}; 196 } 197 my $task_record = {'host'=>'', 'cpu'=>0, 'job' => $job_no, 'task' => $task_no, 'start'=>0, 'end'=>0, 'cpu_time'=>0, 'data_locality'=>$is_data_local, 'file'=>'', 'percom'=>'NA'}; 198 print ' - Reading and parsing "' . $file . '"... '; 199 my $task_log_path = $results_path . '/' . $file; 200 my $io_time = 0; 201 if (open(TIN, '<:utf8', $task_log_path)) 202 { 203 my $io_start_time = 0; 204 while (my $line = <TIN>) 146 205 { 147 $task_record->{'start'} = $1; 206 if ($line =~ /\[Started:(\d+(?:\.\d+)?)\]/) 207 { 208 $task_record->{'start'} = $1; 209 } 210 elsif ($line =~ /\[Host:([^\]]+)\]/) 211 { 212 $task_record->{'host'} = $1; 213 } 214 elsif ($line =~ /\[CPU:(\d+)\]/) 215 { 216 $task_record->{'cpu'} = $1; 217 } 218 elsif ($line =~ /\[Map:([^\>]+)=>/) 219 { 220 $task_record->{'file'} = $1; 221 } 222 elsif ($line =~ /\[IOS:(\d+)\]/) 223 { 224 $io_start_time = $1; 225 } 226 elsif ($io_start_time > 0 && $line =~ /\[IOE:(\d+)\]/) 227 { 228 $io_time += ($1 - $io_start_time); 229 $io_start_time = 0; 230 } 231 elsif ($line =~ /\[Completed:(\d+(?:\.\d+)?)\]/) 232 { 233 my $end_time = $1; 234 $task_record->{'end'} = $end_time; 235 if ($io_start_time > 0) 236 { 237 $io_time += ($end_time - $io_start_time); 238 } 239 } 148 240 } 149 elsif ($line =~ /\[Host:([^\]]+)\]/) 241 close(TIN); 242 # Calculate CPU time (total time - IO time) 243 $task_record->{'cpu_time'} = $task_record->{'end'} - $task_record->{'start'} - $io_time; 244 245 # We should now have the filename - use this and try and locate a 246 # convert log for this item (assuming it is multimedia, which it may 247 # not be) 248 if (defined $task_record->{'file'} && $task_record->{'file'} =~ /\/([^\/]+)\.ts/) 150 249 { 151 $task_record->{'host'} = $1; 152 } 153 elsif ($line =~ /\[CPU:(\d+)\]/) 154 { 155 $task_record->{'cpu'} = $1; 156 } 157 elsif ($line =~ /\[Map:([^\>]+)=>/) 158 { 159 $task_record->{'file'} = $1; 160 } 161 elsif ($line =~ /\[IOS:(\d+)\]/) 162 { 163 $io_start_time = $1; 164 } 165 elsif ($io_start_time > 0 && $line =~ /\[IOE:(\d+)\]/) 166 { 167 $io_time += ($1 - $io_start_time); 168 $io_start_time = 0; 169 } 170 elsif ($line =~ /\[Completed:(\d+(?:\.\d+)?)\]/) 171 { 172 my $end_time = $1; 173 $task_record->{'end'} = $end_time; 174 if ($io_start_time > 0) 175 { 176 $io_time += ($end_time - $io_start_time); 177 } 178 } 179 } 180 close(TIN); 181 # Calculate CPU time (total time - IO time) 182 $task_record->{'cpu_time'} = $task_record->{'end'} - $task_record->{'start'} - $io_time; 183 184 # We should now have the filename - use this and try and locate a 185 # convert log for this item (assuming it is multimedia, which it may 186 # not be) 187 if (defined $task_record->{'file'} && $task_record->{'file'} =~ /\/([^\/]+)\.ts/) 188 { 189 my $filename_sans_extension = $1; 190 my $convert_log = $results_path . '/convert-' . $filename_sans_extension . '.log'; 191 if (-f $convert_log) 192 { 193 print '[Reading and parsing convert log]... '; 194 if (open(CLIN, '<:utf8', $convert_log)) 195 { 196 my $max_percent = 0.00; 197 while (my $line = <CLIN>) 250 my $filename_sans_extension = $1; 251 my $convert_log = $results_path . '/convert-' . $filename_sans_extension . '.log'; 252 if (-f $convert_log) 253 { 254 print '[Reading and parsing convert log]... '; 255 if (open(CLIN, '<:utf8', $convert_log)) 198 256 { 199 if ($line =~ /.*Encoding: task 1 of 1, (\d+\.\d\d) \%/) 257 my $max_percent = 0.00; 258 while (my $line = <CLIN>) 200 259 { 201 my $percent = $1; 202 if ($percent > $max_percent) 260 if ($line =~ /.*Encoding: task 1 of 1, (\d+\.\d\d) \%/) 203 261 { 204 $max_percent = $percent; 262 my $percent = $1; 263 if ($percent > $max_percent) 264 { 265 $max_percent = $percent; 266 } 205 267 } 206 268 } 269 close(CLIN); 270 $task_record->{'percom'} = $max_percent; 207 271 } 208 close(CLIN); 209 $task_record->{'percom'} = $max_percent; 210 } 211 else 212 { 213 print STDERR "Warning! Failed to open log file for reading: " . $convert_log . "\n"; 272 else 273 { 274 print STDERR "Warning! Failed to open log file for reading: " . $convert_log . "\n"; 275 } 214 276 } 215 277 } 216 } 217 218 # Store this record 219 $task_records->{$task_no} = $task_record; 220 } 221 else 222 { 223 die('Error! Failed to open file for reading: ' . $task_log_path); 224 } 225 print "Done!\n"; 226 } 227 } 228 close(DH); 229 } 230 else 231 { 232 die('Error! Failed to open directory for reading: ' . $results_path); 233 } 234 print ' - Processed ' . scalar(keys(%{$task_records})) . " records\n"; 235 236 # Generate compute-node records 237 print ' * Generating compute node records... '; 238 my $node_records; 239 foreach my $taskno (sort keys %{$task_records}) 240 { 241 my $task_record = $task_records->{$taskno}; 242 my $node_record = {'host'=>'', 'cpu'=>0, 'job' => '', 'start'=>0, 'end'=>0, 'cpu_time'=>0}; 243 # - retrieve any existing record 244 my $worker_id = $task_record->{'host'} . '#' . $task_record->{'cpu'}; 245 if (defined $node_records->{$worker_id}) 246 { 247 $node_record = $node_records->{$worker_id}; 248 } 249 if ($node_record->{'host'} eq '') 250 { 251 $node_record->{'host'} = $task_record->{'host'}; 252 } 253 if ($node_record->{'cpu'} == 0) 254 { 255 $node_record->{'cpu'} = $task_record->{'cpu'}; 256 } 257 if ($node_record->{'job'} eq '') 258 { 259 $node_record->{'job'} = $task_record->{'job'}; 260 } 261 if ($node_record->{'start'} == 0 || $task_record->{'start'} < $node_record->{'start'}) 262 { 263 $node_record->{'start'} = $task_record->{'start'}; 264 } 265 if ($node_record->{'end'} == 0 || $task_record->{'end'} > $node_record->{'end'}) 266 { 267 $node_record->{'end'} = $task_record->{'end'}; 268 } 269 $node_record->{'cpu_time'} += $task_record->{'cpu_time'}; 270 # - store it 271 $node_records->{$worker_id} = $node_record; 272 } 273 print "Done!\n"; 274 275 # Write out CSV of all information 276 my $report_csv_path = $results_path . '/timing.csv'; 277 if (open(CSVOUT, '>:utf8', $report_csv_path)) 278 { 279 my $row_counter = 1; 280 # Header 281 print CSVOUT "id,name,hostname,start,end,cputime,dl,pid,filename,percom\n"; 282 # Master Record 283 print CSVOUT $row_counter . ',M0,' . $job_record->{'host'} . ',' . $job_record->{'start'} . ',' . $job_record->{'end'} . ',' . ($job_record->{'cpu_time'} / 1000) . ",0,0,NA,NA\n"; 284 $row_counter++; 285 # For each compute node record 286 my $known_workers = {}; 287 foreach my $worker_id (natsort(keys(%{$node_records}))) 288 { 289 my $node_record = $node_records->{$worker_id}; 290 my $node_id = $row_counter; 278 279 # Store this record 280 $task_records->{$task_no} = $task_record; 281 } 282 else 283 { 284 die('Error! Failed to open file for reading: ' . $task_log_path); 285 } 286 print "Done!\n"; 287 } 288 } 289 close(DH); 290 } 291 else 292 { 293 die('Error! Failed to open directory for reading: ' . $results_path); 294 } 295 print ' - Processed ' . scalar(keys(%{$task_records})) . " records\n"; 296 297 # Generate compute-node records 298 print ' * Generating compute node records... '; 299 my $node_records; 300 foreach my $taskno (sort keys %{$task_records}) 301 { 302 my $task_record = $task_records->{$taskno}; 303 my $node_record = {'host'=>'', 'cpu'=>0, 'job' => '', 'start'=>0, 'end'=>0, 'cpu_time'=>0}; 304 # - retrieve any existing record 305 my $worker_id = $task_record->{'host'} . '#' . $task_record->{'cpu'}; 306 if (defined $node_records->{$worker_id}) 307 { 308 $node_record = $node_records->{$worker_id}; 309 } 310 if ($node_record->{'host'} eq '') 311 { 312 $node_record->{'host'} = $task_record->{'host'}; 313 } 314 if ($node_record->{'cpu'} == 0) 315 { 316 $node_record->{'cpu'} = $task_record->{'cpu'}; 317 } 318 if ($node_record->{'job'} eq '') 319 { 320 $node_record->{'job'} = $task_record->{'job'}; 321 } 322 if ($node_record->{'start'} == 0 || $task_record->{'start'} < $node_record->{'start'}) 323 { 324 $node_record->{'start'} = $task_record->{'start'}; 325 } 326 if ($node_record->{'end'} == 0 || $task_record->{'end'} > $node_record->{'end'}) 327 { 328 $node_record->{'end'} = $task_record->{'end'}; 329 } 330 $node_record->{'cpu_time'} += $task_record->{'cpu_time'}; 331 # - store it 332 $node_records->{$worker_id} = $node_record; 333 } 334 print "Done!\n"; 335 336 # Write out CSV of all information 337 my $report_csv_path = $results_path . '/timing.csv'; 338 if (open(CSVOUT, '>:utf8', $report_csv_path)) 339 { 340 my $row_counter = 1; 341 # Header 342 print CSVOUT "id,name,hostname,start,end,cputime,dl,pid,filename,percom\n"; 343 # Master Record 344 print CSVOUT $row_counter . ',M0,' . $job_record->{'host'} . ',' . $job_record->{'start'} . ',' . $job_record->{'end'} . ',' . ($job_record->{'cpu_time'} / 1000) . ",0,0,NA,NA\n"; 291 345 $row_counter++; 292 my $csv_worker_id = 'W' . $node_record->{'cpu'}; 293 # Ensure we haven't used this id before - this should never trigger for 294 # multicore CPUs, but will for clusters (as nearly all nodes will report 295 # themselves as 'W0') 296 if (defined $known_workers->{$csv_worker_id}) 297 { 298 # Find a different worker id as this one is already in use 299 my $counter = 0; 300 $csv_worker_id = 'W' . $counter; 301 while (defined $known_workers->{$csv_worker_id}) 302 { 303 $counter++; 346 # For each compute node record 347 my $known_workers = {}; 348 foreach my $worker_id (natsort(keys(%{$node_records}))) 349 { 350 my $node_record = $node_records->{$worker_id}; 351 my $node_id = $row_counter; 352 $row_counter++; 353 my $csv_worker_id = 'W' . $node_record->{'cpu'}; 354 # Ensure we haven't used this id before - this should never trigger for 355 # multicore CPUs, but will for clusters (as nearly all nodes will report 356 # themselves as 'W0') 357 if (defined $known_workers->{$csv_worker_id}) 358 { 359 # Find a different worker id as this one is already in use 360 my $counter = 0; 304 361 $csv_worker_id = 'W' . $counter; 305 } 306 } 307 $known_workers->{$csv_worker_id} = 1; 308 print CSVOUT $node_id . ',' . $csv_worker_id . ',' . $node_record->{'host'} . ',' . $node_record->{'start'} . ',' . $node_record->{'end'} . ',' . $node_record->{'cpu_time'} . ",0,1,NA,NA\n"; 309 # List the child task records 310 foreach my $taskno (sort keys %{$task_records}) 311 { 312 my $task_record = $task_records->{$taskno}; 313 if ($task_record->{'host'} . '#' . $task_record->{'cpu'} eq $worker_id) 314 { 315 print CSVOUT $row_counter . ',T' . ($task_record->{'task'} + 0) . ',' . $task_record->{'host'} . ',' . $task_record->{'start'} . ',' . $task_record->{'end'} . ',' . $task_record->{'cpu_time'} . ',' . $task_record->{'data_locality'} . ',' . $node_id . ',' . $task_record->{'file'} . ',' . $task_record->{'percom'} . "\n"; 316 $row_counter++; 317 } 318 } 319 } 320 close(CSVOUT); 321 } 322 else 323 { 324 die('Error! Failed to open file for writing: ' . $report_csv_path); 325 } 326 327 print "Complete!\n\n"; 328 exit; 329 362 while (defined $known_workers->{$csv_worker_id}) 363 { 364 $counter++; 365 $csv_worker_id = 'W' . $counter; 366 } 367 } 368 $known_workers->{$csv_worker_id} = 1; 369 print CSVOUT $node_id . ',' . $csv_worker_id . ',' . $node_record->{'host'} . ',' . $node_record->{'start'} . ',' . $node_record->{'end'} . ',' . $node_record->{'cpu_time'} . ",0,1,NA,NA\n"; 370 # List the child task records 371 foreach my $taskno (sort keys %{$task_records}) 372 { 373 my $task_record = $task_records->{$taskno}; 374 if ($task_record->{'host'} . '#' . $task_record->{'cpu'} eq $worker_id) 375 { 376 print CSVOUT $row_counter . ',T' . ($task_record->{'task'} + 0) . ',' . $task_record->{'host'} . ',' . $task_record->{'start'} . ',' . $task_record->{'end'} . ',' . $task_record->{'cpu_time'} . ',' . $task_record->{'data_locality'} . ',' . $node_id . ',' . $task_record->{'file'} . ',' . $task_record->{'percom'} . "\n"; 377 $row_counter++; 378 } 379 } 380 } 381 close(CSVOUT); 382 } 383 else 384 { 385 die('Error! Failed to open file for writing: ' . $report_csv_path); 386 } 387 } 388 389 1;
Note:
See TracChangeset
for help on using the changeset viewer.