root/gs2-extensions/parallel-building/trunk/src/bin/script/parse_task_info_from_hadoop_log.pl @ 27588

Revision 27588, 9.0 KB (checked in by jmt12, 6 years ago)

Extend parser to support jobs that are split over several logs. Also ensure that split information is printed to CSV even for those tasks that appear to have failed (or completely lack attempt information).

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2
3# Pragma
4use strict;
5use warnings;
6
7# Configuration
8my $debug = 0;
9
10# Requires setup.bash to have been sourced
11BEGIN
12{
13  die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
14  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
15  die "HADOOP_PREFIX not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HADOOP_PREFIX'};
16  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
17  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
18  # Ensure Greenstone Perl locations are in INC
19  unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
20  unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
21  # we'll need the perl version number
22  my ($version_number) = `perl-version.pl`;
23  if (defined $ENV{'GSDLEXTS'})
24  {
25    my @extensions = split(/:/,$ENV{'GSDLEXTS'});
26    foreach my $e (@extensions)
27    {
28      my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
29      unshift (@INC, $ext_prefix . '/perllib');
30      unshift (@INC, $ext_prefix . '/perllib/cpan');
31      unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
32    }
33  }
34}
35
36# Libraries (depends on unshift above
37use Sort::Key::Natural qw(natsort);
38
39# Begin
40print "===== Parse Hadoop Log =====\n";
41
42# 0. Init
43if (!defined $ARGV[0])
44{
45  die("usage: parse_task_info_from_hadoop_log.pl <results dir> [-debug]\n");
46}
47my $results_dir = $ARGV[0];
48if (!-d $results_dir)
49{
50  die("Error! Can't find results directory: " . $results_dir . "\n");
51}
52print " Results directory: " . $results_dir . "\n";
53
54if (defined $ARGV[1] && $ARGV[1] eq '-debug')
55{
56  $debug = 1;
57}
58
59# 1. Determine job ID
60my $hadoop_log_path = &fileCat($results_dir, 'hadoop.log');
61if (!-e $hadoop_log_path)
62{
63  die("Error! Hadoop log file cannot be found: " . $hadoop_log_path . "\n");
64}
65print " Hadoop log path: " . $hadoop_log_path . "\n";
66print " * Determine JobID: ";
67my $job_id;
68my $result = `grep "Running job:" "$hadoop_log_path"`;
69if ($result =~ /Running job: job_(\d+_\d+)/)
70{
71  $job_id = $1;
72}
73else
74{
75  die("Error! Failed to locate JobID\n");
76}
77print $job_id . "\n";
78# - we'll need the date to locate the appopriate log file
79my $log_date_year = 0;
80my $log_date_month = 0;
81my $log_date_day = 0;
82if ($job_id =~ /^(\d\d\d\d)(\d\d)(\d\d)/)
83{
84  $log_date_year = $1;
85  $log_date_month = $2;
86  $log_date_day = $3;
87}
88else
89{
90  die('Error! Failed to parse date from Job ID: ' . $job_id . "\n");
91}
92
93# 2. Determine user and system details
94my $username = `whoami`;
95chomp($username);
96print " Username: " . $username . "\n";
97my $hostname = `hostname`;
98chomp($hostname);
99print " Hostname: " . $hostname . "\n";
100my $data_locality_report_path = &fileCat($results_dir, 'data_locality.csv');
101print " Report path: " . $data_locality_report_path . "\n";
102
103# 3. Parse log
104print " * Parse JobTracker Log(s)...\n";
105my $tid_2_splits = {};
106my $tid_2_node = {};
107my $aid_2_node = {};
108my $job_complete = 0;
109my $parsed_latest_log = 0;
110while (!$job_complete && !$parsed_latest_log)
111{
112  # - determine appropriate job tracker log
113  my $jobtracker_log_path_prefix = &fileCat($ENV{'HADOOP_PREFIX'}, 'logs', 'hadoop-' . $username . '-jobtracker-' . $hostname . '.log');
114  my $jobtracker_log_path = sprintf('%s.%04d-%02d-%02d', $jobtracker_log_path_prefix, $log_date_year, $log_date_month, $log_date_day);
115  # - maybe the log hasn't been rolled yet
116  if (!-e $jobtracker_log_path)
117  {
118    $jobtracker_log_path = $jobtracker_log_path_prefix;
119    # - nope, no applicable log found
120    if (!-e $jobtracker_log_path)
121    {
122      die('Error! Hadoop JobTracker log file cannot be found: ' . $jobtracker_log_path . "\n");
123    }
124    else
125    {
126      $parsed_latest_log = 1;
127    }
128  }
129  print " - parsing JobTracker log: " . $jobtracker_log_path . "\n";
130
131  if (open(JTLIN, '<', $jobtracker_log_path))
132  {
133    my $line = '';
134    while ($line = <JTLIN>)
135    {
136      # Tips provide a match between task and file splits
137      if ($line =~ /tip:task_${job_id}(_m_\d+) has split on node:\/default-rack\/([^\.\r\n]+)/)
138      {
139        my $task_id = $job_id . $1;
140        my $compute_node = $2;
141        &debugPrint('found tip: ' . $task_id . ' => ' . $compute_node);
142        if (!defined $tid_2_splits->{$task_id})
143        {
144          $tid_2_splits->{$task_id} = [$compute_node];
145        }
146        else
147        {
148          push(@{$tid_2_splits->{$task_id}}, $compute_node);
149        }
150      }
151      # JobTracker (MAP) entries give us a mapping between task, attempt, and
152      # compute node
153      elsif ($line =~ /Adding task \(MAP\) 'attempt_${job_id}(_m_\d+)(_\d+)'.*tracker_([^\.]+).local/)
154      {
155        my $task_id = $job_id . $1;
156        my $attempt_id = $job_id . $1 . $2;
157        my $compute_node = $3;
158        &debugPrint('found MAP: ' . $attempt_id . ' => ' . $compute_node);
159        $aid_2_node->{$attempt_id} = {'compute_node' => $compute_node,
160                                      'succeeded' => 0
161                                     };
162      }
163      # Watch for attempt successes (so we can weed out failures)
164      elsif ($line =~ /Task 'attempt_${job_id}(_m_\d+_\d+)' has completed .* successfully/)
165      {
166        my $attempt_id = $job_id . $1;
167        &debugPrint('successful attempt: ' . $attempt_id);
168        if (defined $aid_2_node->{$attempt_id})
169        {
170          $aid_2_node->{$attempt_id}->{'succeeded'} = 1;
171        }
172      }
173      # And job completion... so we can keep parsing other log files as
174      # necessary
175      elsif ($line =~ /Job 'job_${job_id} has completed successfully\./)
176      {
177        $job_complete = 1;
178      }
179    }
180    close(JTLIN);
181  }
182  else
183  {
184    die('Error! Failed to open JobTracker log for reading: ' . $jobtracker_log_path . "\n");
185  }
186  # Increment the day by one - unfortunately this leads to unpleasant date
187  # maths to ensure month and year roll over appropriately too
188  $log_date_day++;
189  # On to a new month?
190  # Leap Year Feb > 29 otherwise Feb > 28
191  # Apr, Jun, Sep, Nov > 30
192  # Rest > 31
193  if ($log_date_month == 2 && ((&isLeapYear($log_date_year) && $log_date_day > 29) || $log_date_day > 28))
194  {
195    $log_date_day = 1;
196    $log_date_month++;
197  }
198  elsif (($log_date_month == 4 || $log_date_month == 6 || $log_date_month == 9 || $log_date_month == 11) && $log_date_day > 30)
199  {
200    $log_date_day = 1;
201    $log_date_month++;
202  }
203  elsif ($log_date_day > 31)
204  {
205    $log_date_day = 1;
206    $log_date_month++;
207  }
208  # On to a new year?
209  if ($log_date_month > 12)
210  {
211    $log_date_month = 1;
212    $log_date_year++;
213  }
214}
215print "Done\n";
216
217if (!$job_complete)
218{
219  print "Warning! Failed to parse in information for a complete job.\n";
220}
221
222# 4. Write CSV of information
223print " * Writing Job Information... ";
224&debugPrint("AttemptID                   \tComputeNode\tSucceeded", 1);
225foreach my $attempt_id (keys %{$aid_2_node})
226{
227  &debugPrint($attempt_id . "\t" . $aid_2_node->{$attempt_id}->{'compute_node'} . "\t" . $aid_2_node->{$attempt_id}->{'succeeded'});
228}
229&debugPrint("TaskID                    \tComputeNodeSplits");
230my $split_counter = 0;
231foreach my $task_id (keys %{$tid_2_splits})
232{
233  &debugPrint($task_id . "\t" . join(',', natsort(@{$tid_2_splits->{$task_id}})));
234  $split_counter++;
235}
236&debugPrint(' * Number of split records: ' . $split_counter);
237&debugPrint('');
238
239# - open the CSV file and write out the combined information from above
240if (open(CSVOUT, '>:utf8', $data_locality_report_path))
241{
242  print CSVOUT "TaskNo,AttemptNo,Data Local,Compute Node,Splits\n";
243  foreach my $attempt_id (natsort(keys %{$aid_2_node}))
244  {
245    my ($job_id, $task_number, $attempt_number) = $attempt_id =~ /^(\d+_\d+)_m_(\d+)_(\d+)/;
246    my $task_id = $job_id . '_m_' . $task_number;
247    my $compute_node = $aid_2_node->{$attempt_id}->{'compute_node'};
248    my @splits = @{$tid_2_splits->{$task_id}};
249    my $data_local = 0;
250    if (grep($_ eq $compute_node, @splits))
251    {
252      $data_local = 1;
253    }
254    print CSVOUT $task_number . "," . $attempt_number . "," . $data_local . ",\"" . $compute_node . "\",\"" . join(',', natsort(@splits)) . "\"\n";
255    undef($tid_2_splits->{$task_id});
256  }
257
258  # Report on any other splits that were recorded in the log, but for unknown
259  # reasons aren't matched with a 'successful' task
260  foreach my $task_id (keys %{$tid_2_splits})
261  {
262    my ($job_id, $task_number) = $task_id =~ /^(\d+_\d+)_m_(\d+)/;
263    my @splits = @{$tid_2_splits->{$task_id}};
264    print CSVOUT $task_number . ",-1,-1,\"\",\"" . join(',', natsort(@splits)) . "\"\n";
265  }
266
267  close(CSVOUT);
268}
269else
270{
271  die("Error! Failed to open file for writing: " . $data_locality_report_path);
272}
273print "Done\n";
274
275# 5. Done
276print "===== Complete! =====\n\n";
277exit;
278
279# Subs
280
281sub debugPrint
282{
283  my ($msg, $force_newline) = @_;
284  if ($debug)
285  {
286    if (defined $force_newline && $force_newline)
287    {
288      print "\n[debug] " . $msg . "\n";
289    }
290    else
291    {
292      print '[debug] ' . $msg . "\n";
293    }
294  }
295}
296
297sub fileCat
298{
299  my $path = join('/', @_);
300  $path =~ s/\/\/+/\//g;
301  return $path;
302}
303
304## @function isLeapYear()
305#
306sub isLeapYear
307{
308  my ($year) = @_;
309  if ($year % 400 == 0)
310  {
311    return 1;
312  }
313  elsif ($year % 100 == 0)
314  {
315    return 0;
316  }
317  elsif ($year % 4 == 0)
318  {
319    return 1;
320  }
321  return 0;
322}
323## isLeapYear() ##
324
3251;
Note: See TracBrowser for help on using the browser.