source: gs2-extensions/parallel-building/trunk/src/bin/script/parse_task_info_from_hadoop_log.pl@ 28014

Last change on this file since 28014 was 28014, checked in by jmt12, 11 years ago

Remove tasks that have had data locality established from the array of pending tasks

  • Property svn:executable set to *
File size: 9.3 KB
Line 
1#!/usr/bin/perl
2
3# Pragma
4use strict;
5use warnings;
6
7# Configuration
8my $debug = 0;
9
10# Requires setup.bash to have been sourced
11BEGIN
12{
13 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
14 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
15 die "HADOOP_PREFIX not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HADOOP_PREFIX'};
16 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
17 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
18 # Ensure Greenstone Perl locations are in INC
19 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
20 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
21 # we'll need the perl version number
22 my ($version_number) = `perl-version.pl`;
23 if (defined $ENV{'GSDLEXTS'})
24 {
25 my @extensions = split(/:/,$ENV{'GSDLEXTS'});
26 foreach my $e (@extensions)
27 {
28 my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
29 unshift (@INC, $ext_prefix . '/perllib');
30 unshift (@INC, $ext_prefix . '/perllib/cpan');
31 unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
32 }
33 }
34}
35
36# Libraries (depends on unshift above
37use Sort::Key::Natural qw(natsort);
38
39# Begin
40print "===== Parse Hadoop Log =====\n";
41
42# 0. Init
43if (!defined $ARGV[0])
44{
45 die("usage: parse_task_info_from_hadoop_log.pl <results dir> [-debug]\n");
46}
47my $results_dir = $ARGV[0];
48if (!-d $results_dir)
49{
50 die("Error! Can't find results directory: " . $results_dir . "\n");
51}
52print " Results directory: " . $results_dir . "\n";
53
54if (defined $ARGV[1] && $ARGV[1] eq '-debug')
55{
56 $debug = 1;
57}
58
59# 1. Determine job ID
60my $hadoop_log_path = &fileCat($results_dir, 'hadoop.log');
61if (!-e $hadoop_log_path)
62{
63 die("Error! Hadoop log file cannot be found: " . $hadoop_log_path . "\n");
64}
65print " Hadoop log path: " . $hadoop_log_path . "\n";
66print " * Determine JobID: ";
67my $job_id;
68my $result = `grep "Running job:" "$hadoop_log_path"`;
69if ($result =~ /Running job: job_(\d+_\d+)/)
70{
71 $job_id = $1;
72}
73else
74{
75 die("Error! Failed to locate JobID\n");
76}
77print $job_id . "\n";
78# - we'll need the date to locate the appopriate log file
79my $log_date_year = 0;
80my $log_date_month = 0;
81my $log_date_day = 0;
82if ($job_id =~ /^(\d\d\d\d)(\d\d)(\d\d)/)
83{
84 $log_date_year = $1;
85 $log_date_month = $2;
86 $log_date_day = $3;
87}
88else
89{
90 die('Error! Failed to parse date from Job ID: ' . $job_id . "\n");
91}
92
93# 2. Determine user and system details
94my $username = `whoami`;
95chomp($username);
96print " Username: " . $username . "\n";
97my $hostname = `hostname`;
98chomp($hostname);
99print " Hostname: " . $hostname . "\n";
100my $data_locality_report_path = &fileCat($results_dir, 'data_locality.csv');
101print " Report path: " . $data_locality_report_path . "\n";
102
103# 3. Parse log
104print " * Parse JobTracker Log(s)...\n";
105my $tid_2_splits = {};
106my $tid_2_node = {};
107my $aid_2_node = {};
108my $job_complete = 0;
109my $parsed_latest_log = 0;
110while (!$job_complete && !$parsed_latest_log)
111{
112 # - determine appropriate job tracker log
113 my $jobtracker_log_path_prefix = &fileCat($ENV{'HADOOP_PREFIX'}, 'logs', 'hadoop-' . $username . '-jobtracker-' . $hostname . '.log');
114 my $jobtracker_log_path = sprintf('%s.%04d-%02d-%02d', $jobtracker_log_path_prefix, $log_date_year, $log_date_month, $log_date_day);
115 # - maybe the log hasn't been rolled yet
116 if (!-e $jobtracker_log_path)
117 {
118 $jobtracker_log_path = $jobtracker_log_path_prefix;
119 # - nope, no applicable log found
120 if (!-e $jobtracker_log_path)
121 {
122 die('Error! Hadoop JobTracker log file cannot be found: ' . $jobtracker_log_path . "\n");
123 }
124 else
125 {
126 $parsed_latest_log = 1;
127 }
128 }
129 print " - parsing JobTracker log: " . $jobtracker_log_path . "\n";
130
131 if (open(JTLIN, '<', $jobtracker_log_path))
132 {
133 my $line = '';
134 while ($line = <JTLIN>)
135 {
136 # Tips provide a match between task and file splits
137 if ($line =~ /tip:task_${job_id}(_m_\d+) has split on node:\/default-rack\/([^\.\r\n]+)/)
138 {
139 my $task_id = $job_id . $1;
140 my $compute_node = $2;
141 &debugPrint('found tip: ' . $task_id . ' => ' . $compute_node);
142 if (!defined $tid_2_splits->{$task_id})
143 {
144 $tid_2_splits->{$task_id} = [$compute_node];
145 }
146 else
147 {
148 push(@{$tid_2_splits->{$task_id}}, $compute_node);
149 }
150 }
151 # JobTracker (MAP) entries give us a mapping between task, attempt, and
152 # compute node
153 elsif ($line =~ /Adding task \(MAP\) 'attempt_${job_id}(_m_\d+)(_\d+)'.*tracker_([^\.]+).local/)
154 {
155 my $task_id = $job_id . $1;
156 my $attempt_id = $job_id . $1 . $2;
157 my $compute_node = $3;
158 &debugPrint('found MAP: ' . $attempt_id . ' => ' . $compute_node);
159 $aid_2_node->{$attempt_id} = {'compute_node' => $compute_node,
160 'succeeded' => 0
161 };
162 }
163 # Watch for attempt successes (so we can weed out failures)
164 elsif ($line =~ /Task 'attempt_${job_id}(_m_\d+_\d+)' has completed .* successfully/)
165 {
166 my $attempt_id = $job_id . $1;
167 &debugPrint('successful attempt: ' . $attempt_id);
168 if (defined $aid_2_node->{$attempt_id})
169 {
170 $aid_2_node->{$attempt_id}->{'succeeded'} = 1;
171 }
172 }
173 # And job completion... so we can keep parsing other log files as
174 # necessary
175 elsif ($line =~ /Job job_${job_id} has completed successfully\./)
176 {
177 $job_complete = 1;
178 }
179 }
180 close(JTLIN);
181 }
182 else
183 {
184 die('Error! Failed to open JobTracker log for reading: ' . $jobtracker_log_path . "\n");
185 }
186 # Increment the day by one - unfortunately this leads to unpleasant date
187 # maths to ensure month and year roll over appropriately too
188 $log_date_day++;
189 # On to a new month?
190 # Leap Year Feb > 29 otherwise Feb > 28
191 # Apr, Jun, Sep, Nov > 30
192 # Rest > 31
193 if ($log_date_month == 2 && ((&isLeapYear($log_date_year) && $log_date_day > 29) || $log_date_day > 28))
194 {
195 $log_date_day = 1;
196 $log_date_month++;
197 }
198 elsif (($log_date_month == 4 || $log_date_month == 6 || $log_date_month == 9 || $log_date_month == 11) && $log_date_day > 30)
199 {
200 $log_date_day = 1;
201 $log_date_month++;
202 }
203 elsif ($log_date_day > 31)
204 {
205 $log_date_day = 1;
206 $log_date_month++;
207 }
208 # On to a new year?
209 if ($log_date_month > 12)
210 {
211 $log_date_month = 1;
212 $log_date_year++;
213 }
214}
215print " Done\n";
216
217if (!$job_complete)
218{
219 print "Warning! Failed to parse in information for a complete job.\n";
220}
221
222# 4. Write CSV of information
223print " * Writing Job Information... ";
224&debugPrint("AttemptID \tComputeNode\tSucceeded", 1);
225foreach my $attempt_id (keys %{$aid_2_node})
226{
227 &debugPrint($attempt_id . "\t" . $aid_2_node->{$attempt_id}->{'compute_node'} . "\t" . $aid_2_node->{$attempt_id}->{'succeeded'});
228}
229&debugPrint("TaskID \tComputeNodeSplits");
230my $split_counter = 0;
231foreach my $task_id (keys %{$tid_2_splits})
232{
233 &debugPrint($task_id . "\t" . join(',', natsort(@{$tid_2_splits->{$task_id}})));
234 $split_counter++;
235}
236&debugPrint(' * Number of split records: ' . $split_counter);
237&debugPrint('');
238
239# - open the CSV file and write out the combined information from above
240if (open(CSVOUT, '>:utf8', $data_locality_report_path))
241{
242 print CSVOUT "TaskNo,AttemptNo,Data Local,Compute Node,Splits\n";
243 foreach my $attempt_id (natsort(keys %{$aid_2_node}))
244 {
245 my ($job_id, $task_number, $attempt_number) = $attempt_id =~ /^(\d+_\d+)_m_(\d+)_(\d+)/;
246 my $task_id = $job_id . '_m_' . $task_number;
247 my $compute_node = $aid_2_node->{$attempt_id}->{'compute_node'};
248 if (defined $tid_2_splits->{$task_id})
249 {
250 my @splits = @{$tid_2_splits->{$task_id}};
251 my $data_local = 0;
252 if (grep($_ eq $compute_node, @splits))
253 {
254 $data_local = 1;
255 }
256 print CSVOUT $task_number . "," . $attempt_number . "," . $data_local . ",\"" . $compute_node . "\",\"" . join(',', natsort(@splits)) . "\"\n";
257 delete($tid_2_splits->{$task_id});
258 }
259 else
260 {
261 print "Warning! Missing data location information for task: " . $task_id . "\n";
262 }
263 }
264
265 # Report on any other splits that were recorded in the log, but for unknown
266 # reasons aren't matched with a 'successful' task
267 foreach my $task_id (keys %{$tid_2_splits})
268 {
269 if (defined $tid_2_splits->{$task_id})
270 {
271 my ($job_id, $task_number) = $task_id =~ /^(\d+_\d+)_m_(\d+)/;
272 my @splits = @{$tid_2_splits->{$task_id}};
273 print CSVOUT $task_number . ",-1,-1,\"\",\"" . join(',', natsort(@splits)) . "\"\n";
274 }
275 }
276
277 close(CSVOUT);
278}
279else
280{
281 die("Error! Failed to open file for writing: " . $data_locality_report_path);
282}
283print "Done\n";
284
285# 5. Done
286print "===== Complete! =====\n\n";
287exit;
288
289# Subs
290
291sub debugPrint
292{
293 my ($msg, $force_newline) = @_;
294 if ($debug)
295 {
296 if (defined $force_newline && $force_newline)
297 {
298 print "\n[debug] " . $msg . "\n";
299 }
300 else
301 {
302 print '[debug] ' . $msg . "\n";
303 }
304 }
305}
306
307sub fileCat
308{
309 my $path = join('/', @_);
310 $path =~ s/\/\/+/\//g;
311 return $path;
312}
313
314## @function isLeapYear()
315#
316sub isLeapYear
317{
318 my ($year) = @_;
319 if ($year % 400 == 0)
320 {
321 return 1;
322 }
323 elsif ($year % 100 == 0)
324 {
325 return 0;
326 }
327 elsif ($year % 4 == 0)
328 {
329 return 1;
330 }
331 return 0;
332}
333## isLeapYear() ##
334
3351;
Note: See TracBrowser for help on using the repository browser.