source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_report.pl@ 28016

Last change on this file since 28016 was 28016, checked in by jmt12, 11 years ago

Allow the hadoop report generator to parse start and end times expressed as doubles (with microtime) where they used to expect longs

  • Property svn:executable set to *
File size: 10.1 KB
Line 
1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6# Requires setup.bash to have been sourced
7BEGIN
8{
9 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
10 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
11 # Ensure Greenstone Perl locations are in INC
12 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
13 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
14 # we'll need the perl version number
15 my ($version_number) = `perl-version.pl`;
16 if (defined $ENV{'GSDLEXTS'})
17 {
18 my @extensions = split(/:/,$ENV{'GSDLEXTS'});
19 foreach my $e (@extensions)
20 {
21 my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
22 unshift (@INC, $ext_prefix . '/perllib');
23 unshift (@INC, $ext_prefix . '/perllib/cpan');
24 unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
25 }
26 }
27}
28
29use Devel::Peek;
30use Sort::Key::Natural qw(natsort);
31use Time::Local;
32
33print "\n===== GS Hadoop Report =====\n";
34print "Generate a report (CSV) from the output of a parallel processing Green-\n";
35print "stone import, suitable for feeding into the Gantt chart generator.\n";
36print "============================\n\n";
37
38# Configuration
39if (!defined $ARGV[0] || !-d $ARGV[0])
40{
41 die("usage: hadoop_report.pl <path to results>\n");
42}
43my $results_path = $ARGV[0];
44
45# Read in hadoop.log and parse top level record
46print ' * Reading and parsing "hadoop.log"... ';
47my $job_record = {'host'=>'', 'job'=>'', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
48my $hadoop_log_path = $results_path . '/hadoop.log';
49if (open(HLIN, '<:utf8', $hadoop_log_path))
50{
51 while (my $line = <HLIN>)
52 {
53 if ($line =~ /host:(.+)/)
54 {
55 $job_record->{'host'} = $1;
56 }
57 elsif ($line =~ /Running job: job_(\d+_\d+)/)
58 {
59 $job_record->{'job'} = $1;
60 }
61 elsif ($line =~ /CPU time spent \(ms\)=(\d+)/)
62 {
63 $job_record->{'cpu_time'} = $1;
64 }
65 elsif ($job_record->{'start'} == 0 && $line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
66 {
67 $job_record->{'start'} = timelocal($6, $5, $4, $3, ($2 - 1), $1);
68 }
69 elsif ($line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
70 {
71 my $end = timelocal($6, $5, $4, $3, ($2 - 1), $1);
72 if ($end > $job_record->{'end'})
73 {
74 $job_record->{'end'} = $end;
75 }
76 }
77 }
78 close(HLIN);
79 if ($job_record->{'start'} == 0 || $job_record->{'end'} == 0)
80 {
81 die('Error! Failed to parse timing information from log: ' . $hadoop_log_path);
82 }
83}
84else
85{
86 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
87}
88print "Done!\n";
89
90# Read in data_locality.csv (will be matched to task logs)
91my $data_was_local = {};
92my $data_locality_csv_path = $results_path . '/data_locality.csv';
93if (-f $data_locality_csv_path)
94{
95 print ' * Reading and parsing "data_locality.csv"... ';
96 if (open(DLIN, '<:utf8', $data_locality_csv_path))
97 {
98 while (my $line = <DLIN>)
99 {
100 if ($line =~ /(\d+),\d,(\d)/)
101 {
102 $data_was_local->{$1} = $2;
103 }
104 }
105 close(DLIN);
106 }
107 else
108 {
109 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
110 }
111 print "Done!\n";
112}
113else
114{
115 print " * Data locality not available or not applicable\n";
116}
117
118# Read in all task logs and parse task records
119my $task_records;
120print " * Locating task logs...\n";
121if (opendir(DH, $results_path))
122{
123 my @files = readdir(DH);
124 foreach my $file (sort @files)
125 {
126 if ($file =~ /import-hadoop-(\d+_\d+)_m_(\d+)_\d+\.log/)
127 {
128 my $job_no = $1;
129 my $task_no = $2;
130 my $is_data_local = 0;
131 if (defined ($data_was_local->{$task_no}))
132 {
133 $is_data_local = $data_was_local->{$task_no};
134 }
135 my $task_record = {'host'=>'', 'cpu'=>0, 'job' => $job_no, 'task' => $task_no, 'start'=>0, 'end'=>0, 'cpu_time'=>0, 'data_locality'=>$is_data_local, 'file'=>'', 'percom'=>'NA'};
136 print ' - Reading and parsing "' . $file . '"... ';
137 my $task_log_path = $results_path . '/' . $file;
138 my $io_time = 0;
139 if (open(TIN, '<:utf8', $task_log_path))
140 {
141 my $io_start_time = 0;
142 while (my $line = <TIN>)
143 {
144 if ($line =~ /\[Started:(\d+(?:\.\d+)?)\]/)
145 {
146 $task_record->{'start'} = $1;
147 }
148 elsif ($line =~ /\[Host:([^\]]+)\]/)
149 {
150 $task_record->{'host'} = $1;
151 }
152 elsif ($line =~ /\[CPU:(\d+)\]/)
153 {
154 $task_record->{'cpu'} = $1;
155 }
156 elsif ($line =~ /\[Map:([^\>]+)=>/)
157 {
158 $task_record->{'file'} = $1;
159 }
160 elsif ($line =~ /\[IOS:(\d+)\]/)
161 {
162 $io_start_time = $1;
163 }
164 elsif ($io_start_time > 0 && $line =~ /\[IOE:(\d+)\]/)
165 {
166 $io_time += ($1 - $io_start_time);
167 $io_start_time = 0;
168 }
169 elsif ($line =~ /\[Completed:(\d+(?:\.\d+)?)\]/)
170 {
171 my $end_time = $1;
172 $task_record->{'end'} = $end_time;
173 if ($io_start_time > 0)
174 {
175 $io_time += ($end_time - $io_start_time);
176 }
177 }
178 }
179 close(TIN);
180 # Calculate CPU time (total time - IO time)
181 $task_record->{'cpu_time'} = $task_record->{'end'} - $task_record->{'start'} - $io_time;
182
183 # We should now have the filename - use this and try and locate a
184 # convert log for this item (assuming it is multimedia, which it may
185 # not be)
186 if (defined $task_record->{'file'} && $task_record->{'file'} =~ /\/([^\/]+)\.ts/)
187 {
188 my $filename_sans_extension = $1;
189 my $convert_log = $results_path . '/convert-' . $filename_sans_extension . '.log';
190 if (-f $convert_log)
191 {
192 print '[Reading and parsing convert log]... ';
193 if (open(CLIN, '<:utf8', $convert_log))
194 {
195 my $max_percent = 0.00;
196 while (my $line = <CLIN>)
197 {
198 if ($line =~ /.*Encoding: task 1 of 1, (\d+\.\d\d) \%/)
199 {
200 my $percent = $1;
201 if ($percent > $max_percent)
202 {
203 $max_percent = $percent;
204 }
205 }
206 }
207 close(CLIN);
208 $task_record->{'percom'} = $max_percent;
209 }
210 else
211 {
212 print STDERR "Warning! Failed to open log file for reading: " . $convert_log . "\n";
213 }
214 }
215 }
216
217 # Store this record
218 $task_records->{$task_no} = $task_record;
219 }
220 else
221 {
222 die('Error! Failed to open file for reading: ' . $task_log_path);
223 }
224 print "Done!\n";
225 }
226 }
227 close(DH);
228}
229else
230{
231 die('Error! Failed to open directory for reading: ' . $results_path);
232}
233print ' - Processed ' . scalar(keys(%{$task_records})) . " records\n";
234
235# Generate compute-node records
236print ' * Generating compute node records... ';
237my $node_records;
238foreach my $taskno (sort keys %{$task_records})
239{
240 my $task_record = $task_records->{$taskno};
241 my $node_record = {'host'=>'', 'cpu'=>0, 'job' => '', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
242 # - retrieve any existing record
243 my $worker_id = $task_record->{'host'} . '#' . $task_record->{'cpu'};
244 if (defined $node_records->{$worker_id})
245 {
246 $node_record = $node_records->{$worker_id};
247 }
248 if ($node_record->{'host'} eq '')
249 {
250 $node_record->{'host'} = $task_record->{'host'};
251 }
252 if ($node_record->{'cpu'} == 0)
253 {
254 $node_record->{'cpu'} = $task_record->{'cpu'};
255 }
256 if ($node_record->{'job'} eq '')
257 {
258 $node_record->{'job'} = $task_record->{'job'};
259 }
260 if ($node_record->{'start'} == 0 || $task_record->{'start'} < $node_record->{'start'})
261 {
262 $node_record->{'start'} = $task_record->{'start'};
263 }
264 if ($node_record->{'end'} == 0 || $task_record->{'end'} > $node_record->{'end'})
265 {
266 $node_record->{'end'} = $task_record->{'end'};
267 }
268 $node_record->{'cpu_time'} += $task_record->{'cpu_time'};
269 # - store it
270 $node_records->{$worker_id} = $node_record;
271}
272print "Done!\n";
273
274# Write out CSV of all information
275my $report_csv_path = $results_path . '/timing.csv';
276if (open(CSVOUT, '>:utf8', $report_csv_path))
277{
278 my $row_counter = 1;
279 # Header
280 print CSVOUT "id,name,hostname,start,end,cputime,dl,pid,filename,percom\n";
281 # Master Record
282 print CSVOUT $row_counter . ',M0,' . $job_record->{'host'} . ',' . $job_record->{'start'} . ',' . $job_record->{'end'} . ',' . ($job_record->{'cpu_time'} / 1000) . ",0,0,NA,NA\n";
283 $row_counter++;
284 # For each compute node record
285 my $known_workers = {};
286 foreach my $worker_id (natsort(keys(%{$node_records})))
287 {
288 my $node_record = $node_records->{$worker_id};
289 my $node_id = $row_counter;
290 $row_counter++;
291 my $csv_worker_id = 'W' . $node_record->{'cpu'};
292 # Ensure we haven't used this id before - this should never trigger for
293 # multicore CPUs, but will for clusters (as nearly all nodes will report
294 # themselves as 'W0')
295 if (defined $known_workers->{$csv_worker_id})
296 {
297 # Find a different worker id as this one is already in use
298 my $counter = 0;
299 $csv_worker_id = 'W' . $counter;
300 while (defined $known_workers->{$csv_worker_id})
301 {
302 $counter++;
303 $csv_worker_id = 'W' . $counter;
304 }
305 }
306 $known_workers->{$csv_worker_id} = 1;
307 print CSVOUT $node_id . ',' . $csv_worker_id . ',' . $node_record->{'host'} . ',' . $node_record->{'start'} . ',' . $node_record->{'end'} . ',' . $node_record->{'cpu_time'} . ",0,1,NA,NA\n";
308 # List the child task records
309 foreach my $taskno (sort keys %{$task_records})
310 {
311 my $task_record = $task_records->{$taskno};
312 if ($task_record->{'host'} . '#' . $task_record->{'cpu'} eq $worker_id)
313 {
314 print CSVOUT $row_counter . ',T' . ($task_record->{'task'} + 0) . ',' . $task_record->{'host'} . ',' . $task_record->{'start'} . ',' . $task_record->{'end'} . ',' . $task_record->{'cpu_time'} . ',' . $task_record->{'data_locality'} . ',' . $node_id . ',' . $task_record->{'file'} . ',' . $task_record->{'percom'} . "\n";
315 $row_counter++;
316 }
317 }
318 }
319 close(CSVOUT);
320}
321else
322{
323 die('Error! Failed to open file for writing: ' . $report_csv_path);
324}
325
326print "Complete!\n\n";
327exit;
328
Note: See TracBrowser for help on using the repository browser.