#!/usr/bin/perl use Cwd; use Sort::Key::Natural qw(natsort); use strict; use warnings; print "=================== Update Data Locality ===================\n"; print "Update the data locality CSV based upon information found in\n"; print "the hadoop.log regarding Split transforms (for instance when\n"; print "attempting to *prevent* data locality)\n"; print "============================================================\n\n"; our $debug = 0; # Loop through all the arguments, descending into directories to process data # locality files my $offset = 0; my $found_dirs = 0; while (defined $ARGV[$offset]) { my $argument = $ARGV[$offset]; # handle -option arguments? if ($argument =~ /^-(.+)$/) { my $option = $1; if ($option eq 'debug') { $debug = 1; } } # handle directories else { my $path = getcwd() . '/' . $argument; if (-d $path) { $found_dirs++; &searchForDataLocalityFiles($path); } } $offset++; } if ($found_dirs == 0) { &printUsage('Missing results directory'); } # Complete! print "\n========================= Complete =========================\n\n"; exit; ################################################################################ ################################# Functions ################################## ################################################################################ ## @function searchForDataLocalityFiles() # sub searchForDataLocalityFiles { my ($dir) = @_; # All of the following is *only* valid for directories if (-d $dir) { print ' * Searching for Data Locality information: ' . $dir . "\n"; # Does this directory contain data_locality.csv? my $dl_csv_path = $dir . '/data_locality.csv'; if (-f $dl_csv_path) { &updateDataLocalityForDirectory($dir); } # Also descend into child directories if (opendir(DIR, $dir)) { my @files = readdir(DIR); closedir(DIR); foreach my $file (@files) { my $path = $dir . '/' . $file; if ($file =~ /^\./) { } elsif (-d $path) { &searchForDataLocalityFiles($path); } } } else { die('Error! Failed to open directory for reading: ' . $dir); } } } ## searchForDataLocalityFiles() ## sub updateDataLocalityForDirectory { my ($dir_prefix) = @_; # 0. Init my $data = {}; my $data_locality_csv_path = $dir_prefix . '/data_locality.csv'; my $hadoop_log_path = $dir_prefix . '/hadoop.log'; # 1. Read in current data locality comma separated file print " * Read current file: data_locality.csv\n"; if (open(DLIN, '<:utf8', $data_locality_csv_path)) { my $record_count = 0; while (my $line = ) { &printDebug($line); # If the line already indicates this csv has been updated (by the # presence of an "OriginalSplits" column) we don't need to do anything # else if ($line =~ /OriginalSplits/) { print " ! File already updated...\n"; close(DLIN); return; } if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/) { my $taskid = $1; my $attempt_no = $2; my $compute_node = $3; my $splits = $4; # Use the taskid to look up the manifest file and therein the filepath my $filepath = ''; my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml'; &printDebug('searching for manifest file: ' . $manifest_path); if (open(MIN, '<:utf8', $manifest_path)) { while (my $line2 = ) { if ($line2 =~ /(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/) { my $protocol = $1; $filepath = 'hdfs:' . $2; } } close(MIN); } else { die('Failed to open manifest for reading: ' . $manifest_path); } &printDebug('filepath: ' . $filepath); $data->{$filepath} = {'taskid' => $taskid, 'attempt' => $attempt_no, 'node' => $compute_node, 'splits' => $splits, 'original_splits' => ''}; $record_count++; } } close(DLIN); print ' - read ' . $record_count . " records.\n"; } else { die('Error! Failed to open file for reading: ' . $data_locality_csv_path); } # 2. Read in hadoop log looking for split mapping information print " * Read split information: hadoop.log\n"; if (open(HLIN, '<:utf8', $hadoop_log_path)) { my $record_count = 0; my $filepath = ''; while (my $line = ) { if ($line =~ /^Filepath:\s(.+)$/) { $filepath = $1; } elsif ($filepath ne '') { # this is the information that was unintentionally lost if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/) { my $original_splits = &parseSplits($1); if (!defined $data->{$filepath}) { die('Found split information for file not in data locality log: ' . $filepath); } $data->{$filepath}->{'original_splits'} = $original_splits; $filepath = ''; $record_count++; } } } close(HLIN); print ' - updated ' . $record_count . " records.\n"; } else { die('Error! Failed to open file for reading: ' . $hadoop_log_path); } # 3. Write out the updated data locality file including information about # original splits (should we have found that information) and corrected # data locality flag. # backup old my $backup_path = $data_locality_csv_path . '.bak'; if (-f $backup_path) { unlink($backup_path); } rename($data_locality_csv_path, $backup_path); # write new print " * Writing out updated file: data_locality.csv\n"; if (open(DLOUT, '>:utf8', $data_locality_csv_path)) { my $record_count = 0; print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n"; foreach my $filepath (keys %{$data}) { my $record = $data->{$filepath}; my $compute_node = $record->{'node'}; my @dl_nodes = split(',', $record->{'original_splits'}); my $data_local = 0; foreach my $dl_node (@dl_nodes) { if ($dl_node eq $compute_node) { $data_local = 1; } } print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n"; $record_count++; } close(DLOUT); print ' - wrote ' . $record_count . " records\n"; } else { die('Error! Failed to open file for writing: ' . $data_locality_csv_path); } } ## updateDataLocalityForDirectory() ## ## @function parseSplits() # sub parseSplits { my ($raw_splits) = @_; my @splits = split(/\s*,\s*/, $raw_splits); return join(',', natsort(@splits)); } sub printDebug { my ($msg) = @_; if ($debug) { chomp($msg); print '[DEBUG] ' . $msg . "\n"; } } sub printUsage { my ($msg) = @_; if (defined $msg) { print 'Error! ' . $msg . "\n"; } print 'Usage: update_data_locality.pl ' . "\n\n"; exit; }