#!/usr/bin/perl # Pragma use strict; use warnings; # Configuration my $debug = 0; # Requires setup.bash to have been sourced BEGIN { die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'}; die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'}; die "HADOOP_PREFIX not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HADOOP_PREFIX'}; die "HDFS HOST not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'}; die "HDFS PORT not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'}; # Ensure Greenstone Perl locations are in INC unshift (@INC, $ENV{'GSDLHOME'} . '/perllib'); unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan'); # we'll need the perl version number my ($version_number) = `perl-version.pl`; if (defined $ENV{'GSDLEXTS'}) { my @extensions = split(/:/,$ENV{'GSDLEXTS'}); foreach my $e (@extensions) { my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e; unshift (@INC, $ext_prefix . '/perllib'); unshift (@INC, $ext_prefix . '/perllib/cpan'); unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number); } } } # Libraries (depends on unshift above use Sort::Key::Natural qw(natsort); # Begin print "===== Parse Hadoop Log =====\n"; # 0. Init if (!defined $ARGV[0]) { die("usage: parse_task_info_from_hadoop_log.pl [-debug]\n"); } my $results_dir = $ARGV[0]; if (!-d $results_dir) { die("Error! Can't find results directory: " . $results_dir . "\n"); } print " Results directory: " . $results_dir . "\n"; if (defined $ARGV[1] && $ARGV[1] eq '-debug') { $debug = 1; } # 1. Determine job ID my $hadoop_log_path = &fileCat($results_dir, 'hadoop.log'); if (!-e $hadoop_log_path) { die("Error! Hadoop log file cannot be found: " . $hadoop_log_path . "\n"); } print " Hadoop log path: " . $hadoop_log_path . "\n"; print " * Determine JobID: "; my $job_id; my $result = `grep "Running job:" "$hadoop_log_path"`; if ($result =~ /Running job: job_(\d+_\d+)/) { $job_id = $1; } else { die("Error! Failed to locate JobID\n"); } print $job_id . "\n"; # - we'll need the date to locate the appopriate log file my $log_date_year = 0; my $log_date_month = 0; my $log_date_day = 0; if ($job_id =~ /^(\d\d\d\d)(\d\d)(\d\d)/) { $log_date_year = $1; $log_date_month = $2; $log_date_day = $3; } else { die('Error! Failed to parse date from Job ID: ' . $job_id . "\n"); } # 2. Determine user and system details my $username = `whoami`; chomp($username); print " Username: " . $username . "\n"; my $hostname = `hostname`; chomp($hostname); print " Hostname: " . $hostname . "\n"; my $data_locality_report_path = &fileCat($results_dir, 'data_locality.csv'); print " Report path: " . $data_locality_report_path . "\n"; # 3. Parse log print " * Parse JobTracker Log(s)...\n"; my $tid_2_splits = {}; my $tid_2_node = {}; my $aid_2_node = {}; my $job_complete = 0; my $parsed_latest_log = 0; while (!$job_complete && !$parsed_latest_log) { # - determine appropriate job tracker log my $jobtracker_log_path_prefix = &fileCat($ENV{'HADOOP_PREFIX'}, 'logs', 'hadoop-' . $username . '-jobtracker-' . $hostname . '.log'); my $jobtracker_log_path = sprintf('%s.%04d-%02d-%02d', $jobtracker_log_path_prefix, $log_date_year, $log_date_month, $log_date_day); # - maybe the log hasn't been rolled yet if (!-e $jobtracker_log_path) { $jobtracker_log_path = $jobtracker_log_path_prefix; # - nope, no applicable log found if (!-e $jobtracker_log_path) { die('Error! Hadoop JobTracker log file cannot be found: ' . $jobtracker_log_path . "\n"); } else { $parsed_latest_log = 1; } } print " - parsing JobTracker log: " . $jobtracker_log_path . "\n"; if (open(JTLIN, '<', $jobtracker_log_path)) { my $line = ''; while ($line = ) { # Tips provide a match between task and file splits if ($line =~ /tip:task_${job_id}(_m_\d+) has split on node:\/default-rack\/([^\.\r\n]+)/) { my $task_id = $job_id . $1; my $compute_node = $2; &debugPrint('found tip: ' . $task_id . ' => ' . $compute_node); if (!defined $tid_2_splits->{$task_id}) { $tid_2_splits->{$task_id} = [$compute_node]; } else { push(@{$tid_2_splits->{$task_id}}, $compute_node); } } # JobTracker (MAP) entries give us a mapping between task, attempt, and # compute node elsif ($line =~ /Adding task \(MAP\) 'attempt_${job_id}(_m_\d+)(_\d+)'.*tracker_([^\.]+).local/) { my $task_id = $job_id . $1; my $attempt_id = $job_id . $1 . $2; my $compute_node = $3; &debugPrint('found MAP: ' . $attempt_id . ' => ' . $compute_node); $aid_2_node->{$attempt_id} = {'compute_node' => $compute_node, 'succeeded' => 0 }; } # Watch for attempt successes (so we can weed out failures) elsif ($line =~ /Task 'attempt_${job_id}(_m_\d+_\d+)' has completed .* successfully/) { my $attempt_id = $job_id . $1; &debugPrint('successful attempt: ' . $attempt_id); if (defined $aid_2_node->{$attempt_id}) { $aid_2_node->{$attempt_id}->{'succeeded'} = 1; } } # And job completion... so we can keep parsing other log files as # necessary elsif ($line =~ /Job job_${job_id} has completed successfully\./) { $job_complete = 1; } } close(JTLIN); } else { die('Error! Failed to open JobTracker log for reading: ' . $jobtracker_log_path . "\n"); } # Increment the day by one - unfortunately this leads to unpleasant date # maths to ensure month and year roll over appropriately too $log_date_day++; # On to a new month? # Leap Year Feb > 29 otherwise Feb > 28 # Apr, Jun, Sep, Nov > 30 # Rest > 31 if ($log_date_month == 2 && ((&isLeapYear($log_date_year) && $log_date_day > 29) || $log_date_day > 28)) { $log_date_day = 1; $log_date_month++; } elsif (($log_date_month == 4 || $log_date_month == 6 || $log_date_month == 9 || $log_date_month == 11) && $log_date_day > 30) { $log_date_day = 1; $log_date_month++; } elsif ($log_date_day > 31) { $log_date_day = 1; $log_date_month++; } # On to a new year? if ($log_date_month > 12) { $log_date_month = 1; $log_date_year++; } } print " Done\n"; if (!$job_complete) { print "Warning! Failed to parse in information for a complete job.\n"; } # 4. Write CSV of information print " * Writing Job Information... "; &debugPrint("AttemptID \tComputeNode\tSucceeded", 1); foreach my $attempt_id (keys %{$aid_2_node}) { &debugPrint($attempt_id . "\t" . $aid_2_node->{$attempt_id}->{'compute_node'} . "\t" . $aid_2_node->{$attempt_id}->{'succeeded'}); } &debugPrint("TaskID \tComputeNodeSplits"); my $split_counter = 0; foreach my $task_id (keys %{$tid_2_splits}) { &debugPrint($task_id . "\t" . join(',', natsort(@{$tid_2_splits->{$task_id}}))); $split_counter++; } &debugPrint(' * Number of split records: ' . $split_counter); &debugPrint(''); # - open the CSV file and write out the combined information from above if (open(CSVOUT, '>:utf8', $data_locality_report_path)) { print CSVOUT "TaskNo,AttemptNo,Data Local,Compute Node,Splits\n"; foreach my $attempt_id (natsort(keys %{$aid_2_node})) { my ($job_id, $task_number, $attempt_number) = $attempt_id =~ /^(\d+_\d+)_m_(\d+)_(\d+)/; my $task_id = $job_id . '_m_' . $task_number; my $compute_node = $aid_2_node->{$attempt_id}->{'compute_node'}; if (defined $tid_2_splits->{$task_id}) { my @splits = @{$tid_2_splits->{$task_id}}; my $data_local = 0; if (grep($_ eq $compute_node, @splits)) { $data_local = 1; } print CSVOUT $task_number . "," . $attempt_number . "," . $data_local . ",\"" . $compute_node . "\",\"" . join(',', natsort(@splits)) . "\"\n"; delete($tid_2_splits->{$task_id}); } else { print "Warning! Missing data location information for task: " . $task_id . "\n"; } } # Report on any other splits that were recorded in the log, but for unknown # reasons aren't matched with a 'successful' task foreach my $task_id (keys %{$tid_2_splits}) { if (defined $tid_2_splits->{$task_id}) { my ($job_id, $task_number) = $task_id =~ /^(\d+_\d+)_m_(\d+)/; my @splits = @{$tid_2_splits->{$task_id}}; print CSVOUT $task_number . ",-1,-1,\"\",\"" . join(',', natsort(@splits)) . "\"\n"; } } close(CSVOUT); } else { die("Error! Failed to open file for writing: " . $data_locality_report_path); } print "Done\n"; # 5. Done print "===== Complete! =====\n\n"; exit; # Subs sub debugPrint { my ($msg, $force_newline) = @_; if ($debug) { if (defined $force_newline && $force_newline) { print "\n[debug] " . $msg . "\n"; } else { print '[debug] ' . $msg . "\n"; } } } sub fileCat { my $path = join('/', @_); $path =~ s/\/\/+/\//g; return $path; } ## @function isLeapYear() # sub isLeapYear { my ($year) = @_; if ($year % 400 == 0) { return 1; } elsif ($year % 100 == 0) { return 0; } elsif ($year % 4 == 0) { return 1; } return 0; } ## isLeapYear() ## 1;