source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_report.pl@ 27684

Last change on this file since 27684 was 27684, checked in by jmt12, 11 years ago

Adding natural sorting into report generation - so also needed to add INC building code typical of GS scripts

  • Property svn:executable set to *
File size: 8.6 KB
Line 
1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6# Requires setup.bash to have been sourced
7BEGIN
8{
9 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
10 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
11 # Ensure Greenstone Perl locations are in INC
12 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
13 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
14 # we'll need the perl version number
15 my ($version_number) = `perl-version.pl`;
16 if (defined $ENV{'GSDLEXTS'})
17 {
18 my @extensions = split(/:/,$ENV{'GSDLEXTS'});
19 foreach my $e (@extensions)
20 {
21 my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
22 unshift (@INC, $ext_prefix . '/perllib');
23 unshift (@INC, $ext_prefix . '/perllib/cpan');
24 unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
25 }
26 }
27}
28
29use Devel::Peek;
30use Sort::Key::Natural qw(natsort);
31use Time::Local;
32
33print "\n===== GS Hadoop Report =====\n";
34print "Generate a report (CSV) from the output of a parallel processing Green-\n";
35print "stone import, suitable for feeding into the Gantt chart generator.\n";
36print "============================\n\n";
37
38# Configuration
39if (!defined $ARGV[0] || !-d $ARGV[0])
40{
41 die("usage: hadoop_report.pl <path to results>\n");
42}
43my $results_path = $ARGV[0];
44
45# Read in hadoop.log and parse top level record
46print ' * Reading and parsing "hadoop.log"... ';
47my $job_record = {'host'=>'', 'job'=>'', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
48my $hadoop_log_path = $results_path . '/hadoop.log';
49if (open(HLIN, '<:utf8', $hadoop_log_path))
50{
51 while (my $line = <HLIN>)
52 {
53 if ($line =~ /host:(.+)/)
54 {
55 $job_record->{'host'} = $1;
56 }
57 elsif ($line =~ /Running job: job_(\d+_\d+)/)
58 {
59 $job_record->{'job'} = $1;
60 }
61 elsif ($line =~ /CPU time spent \(ms\)=(\d+)/)
62 {
63 $job_record->{'cpu_time'} = $1;
64 }
65 elsif ($job_record->{'start'} == 0 && $line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
66 {
67 $job_record->{'start'} = timelocal($6, $5, $4, $3, ($2 - 1), $1);
68 }
69 elsif ($line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
70 {
71 my $end = timelocal($6, $5, $4, $3, ($2 - 1), $1);
72 if ($end > $job_record->{'end'})
73 {
74 $job_record->{'end'} = $end;
75 }
76 }
77 }
78 close(HLIN);
79 if ($job_record->{'start'} == 0 || $job_record->{'end'} == 0)
80 {
81 die('Error! Failed to parse timing information from log: ' . $hadoop_log_path);
82 }
83}
84else
85{
86 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
87}
88print "Done!\n";
89
90# Read in data_locality.csv (will be matched to task logs)
91print ' * Reading and parsing "data_locality.csv"... ';
92my $data_was_local = {};
93my $data_locality_csv_path = $results_path . '/data_locality.csv';
94if (open(DLIN, '<:utf8', $data_locality_csv_path))
95{
96 while (my $line = <DLIN>)
97 {
98 if ($line =~ /(\d+),\d,(\d)/)
99 {
100 $data_was_local->{$1} = $2;
101 }
102 }
103 close(DLIN);
104}
105else
106{
107 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
108}
109print "Done!\n";
110
111# Read in all task logs and parse task records
112my $task_records;
113print " * Locating task logs...\n";
114if (opendir(DH, $results_path))
115{
116 my @files = readdir(DH);
117 foreach my $file (sort @files)
118 {
119 if ($file =~ /import-hadoop-(\d+_\d+)_m_(\d+)_\d+\.log/)
120 {
121 my $job_no = $1;
122 my $task_no = $2;
123 my $is_data_local = $data_was_local->{$task_no};
124 my $task_record = {'host'=>'', 'cpu'=>0, 'job' => $job_no, 'task' => $task_no, 'start'=>0, 'end'=>0, 'cpu_time'=>0, 'data_locality'=>$is_data_local, 'file'=>''};
125 print ' - Reading and parsing "' . $file . '"... ';
126 my $task_log_path = $results_path . '/' . $file;
127 my $io_time = 0;
128 if (open(TIN, '<:utf8', $task_log_path))
129 {
130 my $io_start_time = 0;
131 while (my $line = <TIN>)
132 {
133 if ($line =~ /\[Started:(\d+)\]/)
134 {
135 $task_record->{'start'} = $1;
136 }
137 elsif ($line =~ /\[Host:([^\]]+)\]/)
138 {
139 $task_record->{'host'} = $1;
140 }
141 elsif ($line =~ /\[CPU:(\d+)\]/)
142 {
143 $task_record->{'cpu'} = $1;
144 }
145 elsif ($line =~ /\[Map:([^\]]+)=>1\]/)
146 {
147 $task_record->{'file'} = $1;
148 }
149 elsif ($line =~ /\[IOS:(\d+)\]/)
150 {
151 $io_start_time = $1;
152 }
153 elsif ($io_start_time > 0 && $line =~ /\[IOE:(\d+)\]/)
154 {
155 $io_time += ($1 - $io_start_time);
156 $io_start_time = 0;
157 }
158 elsif ($line =~ /\[Completed:(\d+)\]/)
159 {
160 my $end_time = $1;
161 $task_record->{'end'} = $end_time;
162 if ($io_start_time > 0)
163 {
164 $io_time += ($end_time - $io_start_time);
165 }
166 }
167 }
168 close(TIN);
169 # Calculate CPU time (total time - IO time)
170 $task_record->{'cpu_time'} = $task_record->{'end'} - $task_record->{'start'} - $io_time;
171 # Store this record
172 $task_records->{$task_no} = $task_record;
173 }
174 else
175 {
176 die('Error! Failed to open file for reading: ' . $task_log_path);
177 }
178 print "Done!\n";
179 }
180 }
181 close(DH);
182}
183else
184{
185 die('Error! Failed to open directory for reading: ' . $results_path);
186}
187print ' - Processed ' . scalar(keys(%{$task_records})) . " records\n";
188
189# Generate compute-node records
190print ' * Generating compute node records... ';
191my $node_records;
192foreach my $taskno (sort keys %{$task_records})
193{
194 my $task_record = $task_records->{$taskno};
195 my $node_record = {'host'=>'', 'cpu'=>0, 'job' => '', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
196 # - retrieve any existing record
197 my $worker_id = $task_record->{'host'} . '#' . $task_record->{'cpu'};
198 if (defined $node_records->{$worker_id})
199 {
200 $node_record = $node_records->{$worker_id};
201 }
202 if ($node_record->{'host'} eq '')
203 {
204 $node_record->{'host'} = $task_record->{'host'};
205 }
206 if ($node_record->{'cpu'} == 0)
207 {
208 $node_record->{'cpu'} = $task_record->{'cpu'};
209 }
210 if ($node_record->{'job'} eq '')
211 {
212 $node_record->{'job'} = $task_record->{'job'};
213 }
214 if ($node_record->{'start'} == 0 || $task_record->{'start'} < $node_record->{'start'})
215 {
216 $node_record->{'start'} = $task_record->{'start'};
217 }
218 if ($node_record->{'end'} == 0 || $task_record->{'end'} > $node_record->{'end'})
219 {
220 $node_record->{'end'} = $task_record->{'end'};
221 }
222 $node_record->{'cpu_time'} += $task_record->{'cpu_time'};
223 # - store it
224 $node_records->{$worker_id} = $node_record;
225}
226print "Done!\n";
227
228# Write out CSV of all information
229my $report_csv_path = $results_path . '/timing.csv';
230if (open(CSVOUT, '>:utf8', $report_csv_path))
231{
232 my $row_counter = 1;
233 # Header
234 print CSVOUT "id,name,hostname,start,end,cputime,dl,pid,filename\n";
235 # Master Record
236 print CSVOUT $row_counter . ',M0,' . $job_record->{'host'} . ',' . $job_record->{'start'} . ',' . $job_record->{'end'} . ',' . ($job_record->{'cpu_time'} / 1000) . ",0,0,NA\n";
237 $row_counter++;
238 # For each compute node record
239 my $known_workers = {};
240 foreach my $worker_id (natsort(keys(%{$node_records})))
241 {
242 my $node_record = $node_records->{$worker_id};
243 my $node_id = $row_counter;
244 $row_counter++;
245 my $csv_worker_id = 'W' . $node_record->{'cpu'};
246 # Ensure we haven't used this id before - this should never trigger for
247 # multicore CPUs, but will for clusters (as nearly all nodes will report
248 # themselves as 'W0')
249 if (defined $known_workers->{$csv_worker_id})
250 {
251 # Find a different worker id as this one is already in use
252 my $counter = 0;
253 $csv_worker_id = 'W' . $counter;
254 while (defined $known_workers->{$csv_worker_id})
255 {
256 $counter++;
257 $csv_worker_id = 'W' . $counter;
258 }
259 }
260 $known_workers->{$csv_worker_id} = 1;
261 print CSVOUT $node_id . ',' . $csv_worker_id . ',' . $node_record->{'host'} . ',' . $node_record->{'start'} . ',' . $node_record->{'end'} . ',' . $node_record->{'cpu_time'} . ",0,1,NA\n";
262 # List the child task records
263 foreach my $taskno (sort keys %{$task_records})
264 {
265 my $task_record = $task_records->{$taskno};
266 if ($task_record->{'host'} . '#' . $task_record->{'cpu'} eq $worker_id)
267 {
268 print CSVOUT $row_counter . ',T' . ($task_record->{'task'} + 0) . ',' . $task_record->{'host'} . ',' . $task_record->{'start'} . ',' . $task_record->{'end'} . ',' . $task_record->{'cpu_time'} . ',' . $task_record->{'data_locality'} . ',' . $node_id . ',' . $task_record->{'file'} . "\n";
269 $row_counter++;
270 }
271 }
272 }
273 close(CSVOUT);
274}
275else
276{
277 die('Error! Failed to open file for writing: ' . $report_csv_path);
278}
279
280print "Complete!\n\n";
281exit;
282
Note: See TracBrowser for help on using the repository browser.