source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_report.pl@ 27669

Last change on this file since 27669 was 27669, checked in by jmt12, 11 years ago

Sort compute nodes naturally before labelling them with incremental worker numbers (otherwise they go 0, 1, 10, 11 etc)

  • Property svn:executable set to *
File size: 7.8 KB
Line 
1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6use Devel::Peek;
7use Sort::Key::Natural qw(natsort);
8use Time::Local;
9
10print "\n===== GS Hadoop Report =====\n";
11print "Generate a report (CSV) from the output of a parallel processing Green-\n";
12print "stone import, suitable for feeding into the Gantt chart generator.\n";
13print "============================\n\n";
14
15# Configuration
16if (!defined $ARGV[0] || !-d $ARGV[0])
17{
18 die("usage: hadoop_report.pl <path to results>\n");
19}
20my $results_path = $ARGV[0];
21
22# Read in hadoop.log and parse top level record
23print ' * Reading and parsing "hadoop.log"... ';
24my $job_record = {'host'=>'', 'job'=>'', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
25my $hadoop_log_path = $results_path . '/hadoop.log';
26if (open(HLIN, '<:utf8', $hadoop_log_path))
27{
28 while (my $line = <HLIN>)
29 {
30 if ($line =~ /host:(.+)/)
31 {
32 $job_record->{'host'} = $1;
33 }
34 elsif ($line =~ /Running job: job_(\d+_\d+)/)
35 {
36 $job_record->{'job'} = $1;
37 }
38 elsif ($line =~ /CPU time spent \(ms\)=(\d+)/)
39 {
40 $job_record->{'cpu_time'} = $1;
41 }
42 elsif ($job_record->{'start'} == 0 && $line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
43 {
44 $job_record->{'start'} = timelocal($6, $5, $4, $3, ($2 - 1), $1);
45 }
46 elsif ($line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
47 {
48 my $end = timelocal($6, $5, $4, $3, ($2 - 1), $1);
49 if ($end > $job_record->{'end'})
50 {
51 $job_record->{'end'} = $end;
52 }
53 }
54 }
55 close(HLIN);
56 if ($job_record->{'start'} == 0 || $job_record->{'end'} == 0)
57 {
58 die('Error! Failed to parse timing information from log: ' . $hadoop_log_path);
59 }
60}
61else
62{
63 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
64}
65print "Done!\n";
66
67# Read in data_locality.csv (will be matched to task logs)
68print ' * Reading and parsing "data_locality.csv"... ';
69my $data_was_local = {};
70my $data_locality_csv_path = $results_path . '/data_locality.csv';
71if (open(DLIN, '<:utf8', $data_locality_csv_path))
72{
73 while (my $line = <DLIN>)
74 {
75 if ($line =~ /(\d+),\d,(\d)/)
76 {
77 $data_was_local->{$1} = $2;
78 }
79 }
80 close(DLIN);
81}
82else
83{
84 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
85}
86print "Done!\n";
87
88# Read in all task logs and parse task records
89my $task_records;
90print " * Locating task logs...\n";
91if (opendir(DH, $results_path))
92{
93 my @files = readdir(DH);
94 foreach my $file (sort @files)
95 {
96 if ($file =~ /import-hadoop-(\d+_\d+)_m_(\d+)_\d+\.log/)
97 {
98 my $job_no = $1;
99 my $task_no = $2;
100 my $is_data_local = $data_was_local->{$task_no};
101 my $task_record = {'host'=>'', 'cpu'=>0, 'job' => $job_no, 'task' => $task_no, 'start'=>0, 'end'=>0, 'cpu_time'=>0, 'data_locality'=>$is_data_local, 'file'=>''};
102 print ' - Reading and parsing "' . $file . '"... ';
103 my $task_log_path = $results_path . '/' . $file;
104 my $io_time = 0;
105 if (open(TIN, '<:utf8', $task_log_path))
106 {
107 my $io_start_time = 0;
108 while (my $line = <TIN>)
109 {
110 if ($line =~ /\[Started:(\d+)\]/)
111 {
112 $task_record->{'start'} = $1;
113 }
114 elsif ($line =~ /\[Host:([^\]]+)\]/)
115 {
116 $task_record->{'host'} = $1;
117 }
118 elsif ($line =~ /\[CPU:(\d+)\]/)
119 {
120 $task_record->{'cpu'} = $1;
121 }
122 elsif ($line =~ /\[Map:([^\]]+)=>1\]/)
123 {
124 $task_record->{'file'} = $1;
125 }
126 elsif ($line =~ /\[IOS:(\d+)\]/)
127 {
128 $io_start_time = $1;
129 }
130 elsif ($io_start_time > 0 && $line =~ /\[IOE:(\d+)\]/)
131 {
132 $io_time += ($1 - $io_start_time);
133 $io_start_time = 0;
134 }
135 elsif ($line =~ /\[Completed:(\d+)\]/)
136 {
137 my $end_time = $1;
138 $task_record->{'end'} = $end_time;
139 if ($io_start_time > 0)
140 {
141 $io_time += ($end_time - $io_start_time);
142 }
143 }
144 }
145 close(TIN);
146 # Calculate CPU time (total time - IO time)
147 $task_record->{'cpu_time'} = $task_record->{'end'} - $task_record->{'start'} - $io_time;
148 # Store this record
149 $task_records->{$task_no} = $task_record;
150 }
151 else
152 {
153 die('Error! Failed to open file for reading: ' . $task_log_path);
154 }
155 print "Done!\n";
156 }
157 }
158 close(DH);
159}
160else
161{
162 die('Error! Failed to open directory for reading: ' . $results_path);
163}
164print ' - Processed ' . scalar(keys(%{$task_records})) . " records\n";
165
166# Generate compute-node records
167print ' * Generating compute node records... ';
168my $node_records;
169foreach my $taskno (sort keys %{$task_records})
170{
171 my $task_record = $task_records->{$taskno};
172 my $node_record = {'host'=>'', 'cpu'=>0, 'job' => '', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
173 # - retrieve any existing record
174 my $worker_id = $task_record->{'host'} . '#' . $task_record->{'cpu'};
175 if (defined $node_records->{$worker_id})
176 {
177 $node_record = $node_records->{$worker_id};
178 }
179 if ($node_record->{'host'} eq '')
180 {
181 $node_record->{'host'} = $task_record->{'host'};
182 }
183 if ($node_record->{'cpu'} == 0)
184 {
185 $node_record->{'cpu'} = $task_record->{'cpu'};
186 }
187 if ($node_record->{'job'} eq '')
188 {
189 $node_record->{'job'} = $task_record->{'job'};
190 }
191 if ($node_record->{'start'} == 0 || $task_record->{'start'} < $node_record->{'start'})
192 {
193 $node_record->{'start'} = $task_record->{'start'};
194 }
195 if ($node_record->{'end'} == 0 || $task_record->{'end'} > $node_record->{'end'})
196 {
197 $node_record->{'end'} = $task_record->{'end'};
198 }
199 $node_record->{'cpu_time'} += $task_record->{'cpu_time'};
200 # - store it
201 $node_records->{$worker_id} = $node_record;
202}
203print "Done!\n";
204
205# Write out CSV of all information
206my $report_csv_path = $results_path . '/timing.csv';
207if (open(CSVOUT, '>:utf8', $report_csv_path))
208{
209 my $row_counter = 1;
210 # Header
211 print CSVOUT "id,name,hostname,start,end,cputime,dl,pid,filename\n";
212 # Master Record
213 print CSVOUT $row_counter . ',M0,' . $job_record->{'host'} . ',' . $job_record->{'start'} . ',' . $job_record->{'end'} . ',' . ($job_record->{'cpu_time'} / 1000) . ",0,0,NA\n";
214 $row_counter++;
215 # For each compute node record
216 my $known_workers = {};
217 foreach my $worker_id (natsort(keys(%{$node_records})))
218 {
219 my $node_record = $node_records->{$worker_id};
220 my $node_id = $row_counter;
221 $row_counter++;
222 my $csv_worker_id = 'W' . $node_record->{'cpu'};
223 # Ensure we haven't used this id before - this should never trigger for
224 # multicore CPUs, but will for clusters (as nearly all nodes will report
225 # themselves as 'W0')
226 if (defined $known_workers->{$csv_worker_id})
227 {
228 # Find a different worker id as this one is already in use
229 my $counter = 0;
230 $csv_worker_id = 'W' . $counter;
231 while (defined $known_workers->{$csv_worker_id})
232 {
233 $counter++;
234 $csv_worker_id = 'W' . $counter;
235 }
236 }
237 $known_workers->{$csv_worker_id} = 1;
238 print CSVOUT $node_id . ',' . $csv_worker_id . ',' . $node_record->{'host'} . ',' . $node_record->{'start'} . ',' . $node_record->{'end'} . ',' . $node_record->{'cpu_time'} . ",0,1,NA\n";
239 # List the child task records
240 foreach my $taskno (sort keys %{$task_records})
241 {
242 my $task_record = $task_records->{$taskno};
243 if ($task_record->{'host'} . '#' . $task_record->{'cpu'} eq $worker_id)
244 {
245 print CSVOUT $row_counter . ',T' . ($task_record->{'task'} + 0) . ',' . $task_record->{'host'} . ',' . $task_record->{'start'} . ',' . $task_record->{'end'} . ',' . $task_record->{'cpu_time'} . ',' . $task_record->{'data_locality'} . ',' . $node_id . ',' . $task_record->{'file'} . "\n";
246 $row_counter++;
247 }
248 }
249 }
250 close(CSVOUT);
251}
252else
253{
254 die('Error! Failed to open file for writing: ' . $report_csv_path);
255}
256
257print "Complete!\n\n";
258exit;
259
Note: See TracBrowser for help on using the repository browser.