#!/usr/bin/perl use Sort::Key::Natural qw(natsort); use strict; use warnings; print "=================== Update Data Locality ===================\n"; print "Update the data locality CSV based upon information found in\n"; print "the hadoop.log regarding Split transforms (for instance when\n"; print "attempting to *prevent* data locality)\n"; print "============================================================\n\n"; if (!defined $ARGV[0] || !-d $ARGV[0]) { &printUsage('Missing results directory'); } # 0. Init my $data = {}; my $data_locality_csv_path = $ARGV[0] . '/data_locality.csv'; my $hadoop_log_path = $ARGV[0] . '/hadoop.log'; # 1. Read in current data locality comma separated file print " * Read current file: data_locality.csv\n"; if (open(DLIN, '<:utf8', $data_locality_csv_path)) { my $record_count = 0; while (my $line = ) { if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/) { my $taskid = $1; my $attempt_no = $2; my $compute_node = $3; my $splits = $4; # Use the taskid to look up the manifest file and therein the filepath my $filepath = ''; my $manifest_path = $ARGV[0] . '/manifest' . $taskid . '_' . $attempt_no . '.xml'; if (open(MIN, '<:utf8', $manifest_path)) { while (my $line2 = ) { if ($line2 =~ /(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/) { my $protocol = $1; $filepath = 'hdfs:' . $2; } } close(MIN); } else { die('Failed to open manifest for reading: ' . $manifest_path); } $data->{$filepath} = {'taskid' => $taskid, 'attempt' => $attempt_no, 'node' => $compute_node, 'splits' => $splits, 'original_splits' => ''}; $record_count++; } } close(DLIN); print ' - read ' . $record_count . " records.\n"; } else { die('Error! Failed to open file for reading: ' . $data_locality_csv_path); } # 2. Read in hadoop log looking for split mapping information print " * Read split information: hadoop.log\n"; if (open(HLIN, '<:utf8', $hadoop_log_path)) { my $record_count = 0; my $filepath = ''; while (my $line = ) { if ($line =~ /^Filepath:\s(.+)$/) { $filepath = $1; } elsif ($filepath ne '') { # this is the information that was unintentionally lost if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/) { my $original_splits = &parseSplits($1); if (!defined $data->{$filepath}) { die('Found split information for file not in data locality log: ' . $filepath); } $data->{$filepath}->{'original_splits'} = $original_splits; $filepath = ''; $record_count++; } } } close(HLIN); print ' - updated ' . $record_count . " records.\n"; } else { die('Error! Failed to open file for reading: ' . $hadoop_log_path); } # 3. Write out the updated data locality file including information about # original splits (should we have found that information) and corrected # data locality flag. print " * Writing out updated file: data_locality.csv\n"; if (open(DLOUT, '>:utf8', $data_locality_csv_path)) { my $record_count = 0; print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n"; foreach my $filepath (keys %{$data}) { my $record = $data->{$filepath}; my $compute_node = $record->{'node'}; my @dl_nodes = split(',', $record->{'original_splits'}); my $data_local = 0; foreach my $dl_node (@dl_nodes) { if ($dl_node eq $compute_node) { $data_local = 1; } } print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n"; $record_count++; } close(DLOUT); print ' - wrote ' . $record_count . " records\n"; } else { die('Error! Failed to open file for writing: ' . $data_locality_csv_path); } # 4. Complete print "\n========================= Complete =========================\n\n"; exit; sub parseSplits { my ($raw_splits) = @_; my @splits = split(/\s*,\s*/, $raw_splits); return join(',', natsort(@splits)); } sub printDebug { my ($msg) = @_; print '[DEBUG] ' . $msg . "\n"; } sub printUsage { my ($msg) = @_; if (defined $msg) { print 'Error! ' . $msg . "\n"; } print 'Usage: update_data_locality.pl ' . "\n\n"; exit; }