source: gs2-extensions/parallel-building/trunk/src/bin/script/parse_task_info_from_hadoop_log.pl@ 27588

Last change on this file since 27588 was 27588, checked in by jmt12, 11 years ago

Extend parser to support jobs that are split over several logs. Also ensure that split information is printed to CSV even for those tasks that appear to have failed (or completely lack attempt information).

  • Property svn:executable set to *
File size: 9.0 KB
Line 
1#!/usr/bin/perl
2
3# Pragma
4use strict;
5use warnings;
6
7# Configuration
8my $debug = 0;
9
10# Requires setup.bash to have been sourced
11BEGIN
12{
13 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
14 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
15 die "HADOOP_PREFIX not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HADOOP_PREFIX'};
16 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
17 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
18 # Ensure Greenstone Perl locations are in INC
19 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
20 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
21 # we'll need the perl version number
22 my ($version_number) = `perl-version.pl`;
23 if (defined $ENV{'GSDLEXTS'})
24 {
25 my @extensions = split(/:/,$ENV{'GSDLEXTS'});
26 foreach my $e (@extensions)
27 {
28 my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
29 unshift (@INC, $ext_prefix . '/perllib');
30 unshift (@INC, $ext_prefix . '/perllib/cpan');
31 unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
32 }
33 }
34}
35
36# Libraries (depends on unshift above
37use Sort::Key::Natural qw(natsort);
38
39# Begin
40print "===== Parse Hadoop Log =====\n";
41
42# 0. Init
43if (!defined $ARGV[0])
44{
45 die("usage: parse_task_info_from_hadoop_log.pl <results dir> [-debug]\n");
46}
47my $results_dir = $ARGV[0];
48if (!-d $results_dir)
49{
50 die("Error! Can't find results directory: " . $results_dir . "\n");
51}
52print " Results directory: " . $results_dir . "\n";
53
54if (defined $ARGV[1] && $ARGV[1] eq '-debug')
55{
56 $debug = 1;
57}
58
59# 1. Determine job ID
60my $hadoop_log_path = &fileCat($results_dir, 'hadoop.log');
61if (!-e $hadoop_log_path)
62{
63 die("Error! Hadoop log file cannot be found: " . $hadoop_log_path . "\n");
64}
65print " Hadoop log path: " . $hadoop_log_path . "\n";
66print " * Determine JobID: ";
67my $job_id;
68my $result = `grep "Running job:" "$hadoop_log_path"`;
69if ($result =~ /Running job: job_(\d+_\d+)/)
70{
71 $job_id = $1;
72}
73else
74{
75 die("Error! Failed to locate JobID\n");
76}
77print $job_id . "\n";
78# - we'll need the date to locate the appopriate log file
79my $log_date_year = 0;
80my $log_date_month = 0;
81my $log_date_day = 0;
82if ($job_id =~ /^(\d\d\d\d)(\d\d)(\d\d)/)
83{
84 $log_date_year = $1;
85 $log_date_month = $2;
86 $log_date_day = $3;
87}
88else
89{
90 die('Error! Failed to parse date from Job ID: ' . $job_id . "\n");
91}
92
93# 2. Determine user and system details
94my $username = `whoami`;
95chomp($username);
96print " Username: " . $username . "\n";
97my $hostname = `hostname`;
98chomp($hostname);
99print " Hostname: " . $hostname . "\n";
100my $data_locality_report_path = &fileCat($results_dir, 'data_locality.csv');
101print " Report path: " . $data_locality_report_path . "\n";
102
103# 3. Parse log
104print " * Parse JobTracker Log(s)...\n";
105my $tid_2_splits = {};
106my $tid_2_node = {};
107my $aid_2_node = {};
108my $job_complete = 0;
109my $parsed_latest_log = 0;
110while (!$job_complete && !$parsed_latest_log)
111{
112 # - determine appropriate job tracker log
113 my $jobtracker_log_path_prefix = &fileCat($ENV{'HADOOP_PREFIX'}, 'logs', 'hadoop-' . $username . '-jobtracker-' . $hostname . '.log');
114 my $jobtracker_log_path = sprintf('%s.%04d-%02d-%02d', $jobtracker_log_path_prefix, $log_date_year, $log_date_month, $log_date_day);
115 # - maybe the log hasn't been rolled yet
116 if (!-e $jobtracker_log_path)
117 {
118 $jobtracker_log_path = $jobtracker_log_path_prefix;
119 # - nope, no applicable log found
120 if (!-e $jobtracker_log_path)
121 {
122 die('Error! Hadoop JobTracker log file cannot be found: ' . $jobtracker_log_path . "\n");
123 }
124 else
125 {
126 $parsed_latest_log = 1;
127 }
128 }
129 print " - parsing JobTracker log: " . $jobtracker_log_path . "\n";
130
131 if (open(JTLIN, '<', $jobtracker_log_path))
132 {
133 my $line = '';
134 while ($line = <JTLIN>)
135 {
136 # Tips provide a match between task and file splits
137 if ($line =~ /tip:task_${job_id}(_m_\d+) has split on node:\/default-rack\/([^\.\r\n]+)/)
138 {
139 my $task_id = $job_id . $1;
140 my $compute_node = $2;
141 &debugPrint('found tip: ' . $task_id . ' => ' . $compute_node);
142 if (!defined $tid_2_splits->{$task_id})
143 {
144 $tid_2_splits->{$task_id} = [$compute_node];
145 }
146 else
147 {
148 push(@{$tid_2_splits->{$task_id}}, $compute_node);
149 }
150 }
151 # JobTracker (MAP) entries give us a mapping between task, attempt, and
152 # compute node
153 elsif ($line =~ /Adding task \(MAP\) 'attempt_${job_id}(_m_\d+)(_\d+)'.*tracker_([^\.]+).local/)
154 {
155 my $task_id = $job_id . $1;
156 my $attempt_id = $job_id . $1 . $2;
157 my $compute_node = $3;
158 &debugPrint('found MAP: ' . $attempt_id . ' => ' . $compute_node);
159 $aid_2_node->{$attempt_id} = {'compute_node' => $compute_node,
160 'succeeded' => 0
161 };
162 }
163 # Watch for attempt successes (so we can weed out failures)
164 elsif ($line =~ /Task 'attempt_${job_id}(_m_\d+_\d+)' has completed .* successfully/)
165 {
166 my $attempt_id = $job_id . $1;
167 &debugPrint('successful attempt: ' . $attempt_id);
168 if (defined $aid_2_node->{$attempt_id})
169 {
170 $aid_2_node->{$attempt_id}->{'succeeded'} = 1;
171 }
172 }
173 # And job completion... so we can keep parsing other log files as
174 # necessary
175 elsif ($line =~ /Job 'job_${job_id} has completed successfully\./)
176 {
177 $job_complete = 1;
178 }
179 }
180 close(JTLIN);
181 }
182 else
183 {
184 die('Error! Failed to open JobTracker log for reading: ' . $jobtracker_log_path . "\n");
185 }
186 # Increment the day by one - unfortunately this leads to unpleasant date
187 # maths to ensure month and year roll over appropriately too
188 $log_date_day++;
189 # On to a new month?
190 # Leap Year Feb > 29 otherwise Feb > 28
191 # Apr, Jun, Sep, Nov > 30
192 # Rest > 31
193 if ($log_date_month == 2 && ((&isLeapYear($log_date_year) && $log_date_day > 29) || $log_date_day > 28))
194 {
195 $log_date_day = 1;
196 $log_date_month++;
197 }
198 elsif (($log_date_month == 4 || $log_date_month == 6 || $log_date_month == 9 || $log_date_month == 11) && $log_date_day > 30)
199 {
200 $log_date_day = 1;
201 $log_date_month++;
202 }
203 elsif ($log_date_day > 31)
204 {
205 $log_date_day = 1;
206 $log_date_month++;
207 }
208 # On to a new year?
209 if ($log_date_month > 12)
210 {
211 $log_date_month = 1;
212 $log_date_year++;
213 }
214}
215print "Done\n";
216
217if (!$job_complete)
218{
219 print "Warning! Failed to parse in information for a complete job.\n";
220}
221
222# 4. Write CSV of information
223print " * Writing Job Information... ";
224&debugPrint("AttemptID \tComputeNode\tSucceeded", 1);
225foreach my $attempt_id (keys %{$aid_2_node})
226{
227 &debugPrint($attempt_id . "\t" . $aid_2_node->{$attempt_id}->{'compute_node'} . "\t" . $aid_2_node->{$attempt_id}->{'succeeded'});
228}
229&debugPrint("TaskID \tComputeNodeSplits");
230my $split_counter = 0;
231foreach my $task_id (keys %{$tid_2_splits})
232{
233 &debugPrint($task_id . "\t" . join(',', natsort(@{$tid_2_splits->{$task_id}})));
234 $split_counter++;
235}
236&debugPrint(' * Number of split records: ' . $split_counter);
237&debugPrint('');
238
239# - open the CSV file and write out the combined information from above
240if (open(CSVOUT, '>:utf8', $data_locality_report_path))
241{
242 print CSVOUT "TaskNo,AttemptNo,Data Local,Compute Node,Splits\n";
243 foreach my $attempt_id (natsort(keys %{$aid_2_node}))
244 {
245 my ($job_id, $task_number, $attempt_number) = $attempt_id =~ /^(\d+_\d+)_m_(\d+)_(\d+)/;
246 my $task_id = $job_id . '_m_' . $task_number;
247 my $compute_node = $aid_2_node->{$attempt_id}->{'compute_node'};
248 my @splits = @{$tid_2_splits->{$task_id}};
249 my $data_local = 0;
250 if (grep($_ eq $compute_node, @splits))
251 {
252 $data_local = 1;
253 }
254 print CSVOUT $task_number . "," . $attempt_number . "," . $data_local . ",\"" . $compute_node . "\",\"" . join(',', natsort(@splits)) . "\"\n";
255 undef($tid_2_splits->{$task_id});
256 }
257
258 # Report on any other splits that were recorded in the log, but for unknown
259 # reasons aren't matched with a 'successful' task
260 foreach my $task_id (keys %{$tid_2_splits})
261 {
262 my ($job_id, $task_number) = $task_id =~ /^(\d+_\d+)_m_(\d+)/;
263 my @splits = @{$tid_2_splits->{$task_id}};
264 print CSVOUT $task_number . ",-1,-1,\"\",\"" . join(',', natsort(@splits)) . "\"\n";
265 }
266
267 close(CSVOUT);
268}
269else
270{
271 die("Error! Failed to open file for writing: " . $data_locality_report_path);
272}
273print "Done\n";
274
275# 5. Done
276print "===== Complete! =====\n\n";
277exit;
278
279# Subs
280
281sub debugPrint
282{
283 my ($msg, $force_newline) = @_;
284 if ($debug)
285 {
286 if (defined $force_newline && $force_newline)
287 {
288 print "\n[debug] " . $msg . "\n";
289 }
290 else
291 {
292 print '[debug] ' . $msg . "\n";
293 }
294 }
295}
296
297sub fileCat
298{
299 my $path = join('/', @_);
300 $path =~ s/\/\/+/\//g;
301 return $path;
302}
303
304## @function isLeapYear()
305#
306sub isLeapYear
307{
308 my ($year) = @_;
309 if ($year % 400 == 0)
310 {
311 return 1;
312 }
313 elsif ($year % 100 == 0)
314 {
315 return 0;
316 }
317 elsif ($year % 4 == 0)
318 {
319 return 1;
320 }
321 return 0;
322}
323## isLeapYear() ##
324
3251;
Note: See TracBrowser for help on using the repository browser.