source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_report.pl@ 28652

Last change on this file since 28652 was 28652, checked in by jmt12, 10 years ago

Changes to support running the reports over logs produced from multicore machines (Karearea) whereas I'd previously only supported clusters (Medusa). The biggest issue is that all workers claim to be 'W0' - so you need to differentiate by core rather than node

  • Property svn:executable set to *
File size: 11.7 KB
Line 
1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6# Requires setup.bash to have been sourced
7BEGIN
8{
9 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
10 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
11 # Ensure Greenstone Perl locations are in INC
12 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
13 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
14 # we'll need the perl version number
15 my ($version_number) = `perl-version.pl`;
16 if (defined $ENV{'GSDLEXTS'})
17 {
18 my @extensions = split(/:/,$ENV{'GSDLEXTS'});
19 foreach my $e (@extensions)
20 {
21 my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
22 unshift (@INC, $ext_prefix . '/perllib');
23 unshift (@INC, $ext_prefix . '/perllib/cpan');
24 unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
25 }
26 }
27}
28
29use Cwd;
30use Devel::Peek;
31use Sort::Key::Natural qw(natsort);
32use Time::Local;
33
34print "\n===== GS Hadoop Report =====\n";
35print "Generate a report (CSV) from the output of a parallel processing Green-\n";
36print "stone import, suitable for feeding into the Gantt chart generator.\n";
37print "============================\n\n";
38
39# Configuration
40my $offset = 0;
41my $dir_count = 0;
42while (defined $ARGV[$offset])
43{
44 my $argument = $ARGV[$offset];
45 if ($argument =~ /^\-(.*)/)
46 {
47 }
48 else
49 {
50 my $path = getcwd() . '/' . $argument;
51 if (-d $path)
52 {
53 &searchForHadoopLog($path);
54 $dir_count++;
55 }
56 $offset++;
57 }
58}
59if ($dir_count == 0)
60{
61 die("usage: hadoop_report.pl <path to results>\n");
62}
63print "\n\n================================== Complete ====================================\n\n";
64exit;
65
66sub searchForHadoopLog
67{
68 my ($dir) = @_;
69 if (-d $dir)
70 {
71 print ' * Searching for hadoop.log: ' . $dir . "\n";
72 my $hadoop_log_path = $dir . '/hadoop.log';
73 if (-f $hadoop_log_path)
74 {
75 &generateReport($dir);
76 }
77 # search recursively
78 if (opendir(DH, $dir))
79 {
80 my @files = readdir(DH);
81 closedir(DH);
82 foreach my $file (@files)
83 {
84 my $path = $dir . '/' . $file;
85 # ignore files starting with .
86 if ($file =~ /^\./)
87 {
88 }
89 elsif (-d $path)
90 {
91 &searchForHadoopLog($path);
92 }
93 }
94 }
95 else
96 {
97 die('Error! Failed to open directory for reading: ' . $dir);
98 }
99 }
100}
101
102sub generateReport
103{
104 my ($results_path) = @_;
105
106 # Read in hadoop.log and parse top level record
107 print ' * Reading and parsing "hadoop.log"... ';
108 my $job_record = {'host'=>'', 'job'=>'', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
109 my $hadoop_log_path = $results_path . '/hadoop.log';
110 if (open(HLIN, '<:utf8', $hadoop_log_path))
111 {
112 while (my $line = <HLIN>)
113 {
114 if ($line =~ /host:(.+)/)
115 {
116 $job_record->{'host'} = $1;
117 }
118 elsif ($line =~ /Running job: job_(\d+_\d+)/)
119 {
120 $job_record->{'job'} = $1;
121 }
122 elsif ($line =~ /CPU time spent \(ms\)=(\d+)/)
123 {
124 $job_record->{'cpu_time'} = $1;
125 }
126 elsif ($job_record->{'start'} == 0 && $line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
127 {
128 $job_record->{'start'} = timelocal($6, $5, $4, $3, ($2 - 1), $1);
129 }
130 elsif ($line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
131 {
132 my $end = timelocal($6, $5, $4, $3, ($2 - 1), $1);
133 if ($end > $job_record->{'end'})
134 {
135 $job_record->{'end'} = $end;
136 }
137 }
138 }
139 close(HLIN);
140 if ($job_record->{'start'} == 0 || $job_record->{'end'} == 0)
141 {
142 die('Error! Failed to parse timing information from log: ' . $hadoop_log_path);
143 }
144 }
145 else
146 {
147 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
148 }
149 print "Done!\n";
150
151 # Read in data_locality.csv (will be matched to task logs)
152 my $data_was_local = {};
153 my $data_locality_csv_path = $results_path . '/data_locality.csv';
154 if (-f $data_locality_csv_path)
155 {
156 print ' * Reading and parsing "data_locality.csv"... ';
157 if (open(DLIN, '<:utf8', $data_locality_csv_path))
158 {
159 while (my $line = <DLIN>)
160 {
161 # note that the line may begin with a taskid or just a taskno (legacy)
162 if ($line =~ /(\d+),\d,(\d)/)
163 {
164 $data_was_local->{$1} = $2;
165 }
166 }
167 close(DLIN);
168 }
169 else
170 {
171 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
172 }
173 print "Done!\n";
174 }
175 else
176 {
177 print " * Data locality not available or not applicable\n";
178 }
179
180 # Read in all task logs and parse task records
181 my $task_records;
182 print " * Locating task logs...\n";
183 if (opendir(DH, $results_path))
184 {
185 my @files = readdir(DH);
186 foreach my $file (sort @files)
187 {
188 if ($file =~ /import-hadoop-(\d+_\d+)_m_(\d+)_\d+\.log/)
189 {
190 my $job_no = $1;
191 my $task_no = $2;
192 my $is_data_local = 0;
193 if (defined ($data_was_local->{$task_no}))
194 {
195 $is_data_local = $data_was_local->{$task_no};
196 }
197 my $task_record = {'host'=>'', 'cpu'=>0, 'job' => $job_no, 'task' => $task_no, 'start'=>0, 'end'=>0, 'cpu_time'=>0, 'data_locality'=>$is_data_local, 'file'=>'', 'percom'=>'NA'};
198 print ' - Reading and parsing "' . $file . '"... ';
199 my $task_log_path = $results_path . '/' . $file;
200 my $io_time = 0;
201 if (open(TIN, '<:utf8', $task_log_path))
202 {
203 my $io_start_time = 0;
204 while (my $line = <TIN>)
205 {
206 if ($line =~ /\[Started:(\d+(?:\.\d+)?)\]/)
207 {
208 $task_record->{'start'} = $1;
209 }
210 elsif ($line =~ /\[Host:([^\]]+)\]/)
211 {
212 $task_record->{'host'} = $1;
213 }
214 elsif ($line =~ /\[CPU:(\d+)\]/)
215 {
216 $task_record->{'cpu'} = $1;
217 }
218 elsif ($line =~ /\[Map:([^\>]+)=>/)
219 {
220 $task_record->{'file'} = $1;
221 }
222 elsif ($line =~ /\[IOS:(\d+)\]/)
223 {
224 $io_start_time = $1;
225 }
226 elsif ($io_start_time > 0 && $line =~ /\[IOE:(\d+)\]/)
227 {
228 $io_time += ($1 - $io_start_time);
229 $io_start_time = 0;
230 }
231 elsif ($line =~ /\[Completed:(\d+(?:\.\d+)?)\]/)
232 {
233 my $end_time = $1;
234 $task_record->{'end'} = $end_time;
235 if ($io_start_time > 0)
236 {
237 $io_time += ($end_time - $io_start_time);
238 }
239 }
240 }
241 close(TIN);
242 # Calculate CPU time (total time - IO time)
243 $task_record->{'cpu_time'} = $task_record->{'end'} - $task_record->{'start'} - $io_time;
244
245 # We should now have the filename - use this and try and locate a
246 # convert log for this item (assuming it is multimedia, which it may
247 # not be)
248 if (defined $task_record->{'file'} && $task_record->{'file'} =~ /\/([^\/]+)\.ts/)
249 {
250 my $filename_sans_extension = $1;
251 my $convert_log = $results_path . '/convert-' . $filename_sans_extension . '.log';
252 if (-f $convert_log)
253 {
254 print '[Reading and parsing convert log]... ';
255 if (open(CLIN, '<:utf8', $convert_log))
256 {
257 my $max_percent = 0.00;
258 while (my $line = <CLIN>)
259 {
260 if ($line =~ /.*Encoding: task 1 of 1, (\d+\.\d\d) \%/)
261 {
262 my $percent = $1;
263 if ($percent > $max_percent)
264 {
265 $max_percent = $percent;
266 }
267 }
268 }
269 close(CLIN);
270 $task_record->{'percom'} = $max_percent;
271 }
272 else
273 {
274 print STDERR "Warning! Failed to open log file for reading: " . $convert_log . "\n";
275 }
276 }
277 }
278
279 # Store this record
280 $task_records->{$task_no} = $task_record;
281 }
282 else
283 {
284 die('Error! Failed to open file for reading: ' . $task_log_path);
285 }
286 print "Done!\n";
287 }
288 }
289 close(DH);
290 }
291 else
292 {
293 die('Error! Failed to open directory for reading: ' . $results_path);
294 }
295 print ' - Processed ' . scalar(keys(%{$task_records})) . " records\n";
296
297 # Generate compute-node records
298 print ' * Generating compute node records... ';
299 my $node_records;
300 foreach my $taskno (sort keys %{$task_records})
301 {
302 my $task_record = $task_records->{$taskno};
303 my $node_record = {'host'=>'', 'cpu'=>0, 'job' => '', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
304 # - retrieve any existing record
305 my $worker_id = $task_record->{'host'} . '#' . $task_record->{'cpu'};
306 if (defined $node_records->{$worker_id})
307 {
308 $node_record = $node_records->{$worker_id};
309 }
310 if ($node_record->{'host'} eq '')
311 {
312 $node_record->{'host'} = $task_record->{'host'};
313 }
314 if ($node_record->{'cpu'} == 0)
315 {
316 $node_record->{'cpu'} = $task_record->{'cpu'};
317 }
318 if ($node_record->{'job'} eq '')
319 {
320 $node_record->{'job'} = $task_record->{'job'};
321 }
322 if ($node_record->{'start'} == 0 || $task_record->{'start'} < $node_record->{'start'})
323 {
324 $node_record->{'start'} = $task_record->{'start'};
325 }
326 if ($node_record->{'end'} == 0 || $task_record->{'end'} > $node_record->{'end'})
327 {
328 $node_record->{'end'} = $task_record->{'end'};
329 }
330 $node_record->{'cpu_time'} += $task_record->{'cpu_time'};
331 # - store it
332 $node_records->{$worker_id} = $node_record;
333 }
334 print "Done!\n";
335
336 # Write out CSV of all information
337 my $report_csv_path = $results_path . '/timing.csv';
338 if (open(CSVOUT, '>:utf8', $report_csv_path))
339 {
340 my $row_counter = 1;
341 # Header
342 print CSVOUT "id,name,hostname,start,end,cputime,dl,pid,filename,percom\n";
343 # Master Record
344 print CSVOUT $row_counter . ',M0,' . $job_record->{'host'} . ',' . $job_record->{'start'} . ',' . $job_record->{'end'} . ',' . ($job_record->{'cpu_time'} / 1000) . ",0,0,NA,NA\n";
345 $row_counter++;
346 # For each compute node record
347 my $known_workers = {};
348 foreach my $worker_id (natsort(keys(%{$node_records})))
349 {
350 my $node_record = $node_records->{$worker_id};
351 my $node_id = $row_counter;
352 $row_counter++;
353 my $csv_worker_id = 'W' . $node_record->{'cpu'};
354 # Ensure we haven't used this id before - this should never trigger for
355 # multicore CPUs, but will for clusters (as nearly all nodes will report
356 # themselves as 'W0')
357 if (defined $known_workers->{$csv_worker_id})
358 {
359 # Find a different worker id as this one is already in use
360 my $counter = 0;
361 $csv_worker_id = 'W' . $counter;
362 while (defined $known_workers->{$csv_worker_id})
363 {
364 $counter++;
365 $csv_worker_id = 'W' . $counter;
366 }
367 }
368 $known_workers->{$csv_worker_id} = 1;
369 print CSVOUT $node_id . ',' . $csv_worker_id . ',' . $node_record->{'host'} . ',' . $node_record->{'start'} . ',' . $node_record->{'end'} . ',' . $node_record->{'cpu_time'} . ",0,1,NA,NA\n";
370 # List the child task records
371 foreach my $taskno (sort keys %{$task_records})
372 {
373 my $task_record = $task_records->{$taskno};
374 if ($task_record->{'host'} . '#' . $task_record->{'cpu'} eq $worker_id)
375 {
376 print CSVOUT $row_counter . ',T' . ($task_record->{'task'} + 0) . ',' . $task_record->{'host'} . ',' . $task_record->{'start'} . ',' . $task_record->{'end'} . ',' . $task_record->{'cpu_time'} . ',' . $task_record->{'data_locality'} . ',' . $node_id . ',' . $task_record->{'file'} . ',' . $task_record->{'percom'} . "\n";
377 $row_counter++;
378 }
379 }
380 }
381 close(CSVOUT);
382 }
383 else
384 {
385 die('Error! Failed to open file for writing: ' . $report_csv_path);
386 }
387}
388
3891;
Note: See TracBrowser for help on using the repository browser.