#!/usr/bin/perl # Pragma use strict; use warnings; # Configuration my $debug = 0; # Requires setup.bash to have been sourced BEGIN { die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'}; die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'}; die "HADOOP_PREFIX not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HADOOP_PREFIX'}; die "HDFS HOST not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'}; die "HDFS PORT not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'}; # Ensure Greenstone Perl locations are in INC unshift (@INC, $ENV{'GSDLHOME'} . '/perllib'); unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan'); # we'll need the perl version number my ($version_number) = $^V =~ /(\d+\.\d+\.\d+)/; if (defined $ENV{'GSDLEXTS'}) { my @extensions = split(/:/,$ENV{'GSDLEXTS'}); foreach my $e (@extensions) { my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e; unshift (@INC, $ext_prefix . '/perllib'); unshift (@INC, $ext_prefix . '/perllib/cpan'); unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number); } } } # Libraries (depends on unshift above use Sort::Key::Natural qw(natsort); # Begin print "===== Parse Hadoop Log =====\n"; # 0. Init if (!defined $ARGV[0]) { die("usage: parse_task_info_from_hadoop_log.pl \n"); } my $results_dir = $ARGV[0]; if (!-d $results_dir) { die("Error! Can't find results directory: " . $results_dir . "\n"); } print " Results directory: " . $results_dir . "\n"; my $hadoop_log_path = &fileCat($results_dir, 'hadoop.log'); if (!-e $hadoop_log_path) { die("Error! Hadoop log file cannot be found: " . $hadoop_log_path . "\n"); } print " Hadoop log path: " . $hadoop_log_path . "\n"; my $username = `whoami`; chomp($username); print " Username: " . $username . "\n"; my $hostname = `hostname`; chomp($hostname); print " Hostname: " . $hostname . "\n"; my $jobtracker_log_path = &fileCat($ENV{'HADOOP_PREFIX'}, 'logs', 'hadoop-' . $username . '-jobtracker-' . $hostname . '.log'); if (!-e $jobtracker_log_path) { die("Error! Hadoop JobTracker log file cannot be found: " . $jobtracker_log_path . "\n"); } print " Jobtracker log path: " . $jobtracker_log_path . "\n"; my $data_locality_report_path = &fileCat($results_dir, 'data_locality.csv'); print " Report path: " . $data_locality_report_path . "\n"; # 1. Determine job ID print " * Determine JobID: "; my $job_id; my $result = `grep "Running job:" "$hadoop_log_path"`; if ($result =~ /Running job: job_(\d+_\d+)/) { $job_id = $1; } else { die("Error! Failed to locate JobID\n"); } print $job_id . "\n"; # 2. Parse log print " * Parse JobTracker Log... "; my $tid_2_splits = {}; my $tid_2_node = {}; my $aid_2_node = {}; if (open(JTLIN, '<', $jobtracker_log_path)) { my $line = ''; while ($line = ) { # Tips provide a match between task and file splits if ($line =~ /tip:task_${job_id}(_m_\d+) has split on node:\/default-rack\/([^\.]+).local/) { my $task_id = $job_id . $1; my $compute_node = $2; &debugPrint('found tip: ' . $task_id . ' => ' . $compute_node); if (!defined $tid_2_splits->{$task_id}) { $tid_2_splits->{$task_id} = [$compute_node]; } else { push(@{$tid_2_splits->{$task_id}}, $compute_node); } } # JobTracker (MAP) entries give us a mapping between task, attempt, and # compute node if ($line =~ /Adding task \(MAP\) 'attempt_${job_id}(_m_\d+)(_\d+)'.*tracker_([^\.]+).local/) { my $task_id = $job_id . $1; my $attempt_id = $job_id . $1 . $2; my $compute_node = $3; &debugPrint('found MAP: ' . $attempt_id . ' => ' . $compute_node); $aid_2_node->{$attempt_id} = {'compute_node' => $compute_node, 'succeeded' => 0 }; } # Watch for attempt successes (so we can weed out failures) if ($line =~ /Task 'attempt_${job_id}(_m_\d+_\d+)' has completed .* successfully/) { my $attempt_id = $job_id . $1; &debugPrint('successful attempt: ' . $attempt_id); if (defined $aid_2_node->{$attempt_id}) { $aid_2_node->{$attempt_id}->{'succeeded'} = 1; } } } close(JTLIN); } else { die("Error! Failed to open JobTracker log for reading: " . $jobtracker_log_path . "\n"); } print "Done\n"; # 3. Write CSV of information print " * Writing Job Information... "; &debugPrint("AttemptID\tComputeNode\tSucceeded"); foreach my $attempt_id (keys %{$aid_2_node}) { &debugPrint($attempt_id . "\t" . $aid_2_node->{$attempt_id}->{'compute_node'} . "\t" . $aid_2_node->{$attempt_id}->{'succeeded'}); } &debugPrint("TaskID\tComputeNodeSplits"); foreach my $task_id (keys %{$tid_2_splits}) { &debugPrint($task_id . "\t" . join(',', natsort(@{$tid_2_splits->{$task_id}}))); } # - open the CSV file and write out the combined information from above if (open(CSVOUT, '>:utf8', $data_locality_report_path)) { print CSVOUT "TaskNo,AttemptNo,Data Local,Compute Node,Splits\n"; foreach my $attempt_id (natsort(keys %{$aid_2_node})) { my ($job_id, $task_number, $attempt_number) = $attempt_id =~ /^(\d+_\d+)_m_(\d+)_(\d+)/; my $task_id = $job_id . '_m_' . $task_number; my $compute_node = $aid_2_node->{$attempt_id}->{'compute_node'}; my @splits = @{$tid_2_splits->{$task_id}}; my $data_local = 'N'; if (grep($_ eq $compute_node, @splits)) { $data_local = 'Y'; } print CSVOUT $task_number . "," . $attempt_number . "," . $data_local . "," . $compute_node . ",\"" . join(',', natsort(@splits)) . "\"\n"; } close(CSVOUT); } else { die("Error! Failed to open file for writing: " . $data_locality_report_path); } print "Done\n"; # 4. Done print "===== Complete! =====\n\n"; exit; # Subs sub debugPrint { my ($msg) = @_; if ($debug) { print '[debug] ' . $msg . "\n"; } } sub fileCat { my $path = join('/', @_); $path =~ s/\/\/+/\//g; return $path; }