root/gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl @ 28357

Revision 28357, 4.5 KB (checked in by jmt12, 7 years ago)

used to update the data_locality.csv file in the case where other transforms have been applied to the HDFS file splitting algorimth - with this information extracted from the hadoop.log file

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2
3use Sort::Key::Natural qw(natsort);
4
5use strict;
6use warnings;
7
8print "=================== Update Data Locality ===================\n";
9print "Update the data locality CSV based upon information found in\n";
10print "the hadoop.log regarding Split transforms (for instance when\n";
11print "attempting to *prevent* data locality)\n";
12print "============================================================\n\n";
13
14if (!defined $ARGV[0] || !-d $ARGV[0])
15{
16  &printUsage('Missing results directory');
17}
18
19# 0. Init
20my $data = {};
21my $data_locality_csv_path = $ARGV[0] . '/data_locality.csv';
22my $hadoop_log_path = $ARGV[0] . '/hadoop.log';
23
24# 1. Read in current data locality comma separated file
25print " * Read current file: data_locality.csv\n";
26if (open(DLIN, '<:utf8', $data_locality_csv_path))
27{
28  my $record_count = 0;
29  while (my $line = <DLIN>)
30  {
31    if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
32    {
33      my $taskid       = $1;
34      my $attempt_no   = $2;
35      my $compute_node = $3;
36      my $splits       = $4;
37
38      # Use the taskid to look up the manifest file and therein the filepath
39      my $filepath = '';
40      my $manifest_path = $ARGV[0] . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
41      if (open(MIN, '<:utf8', $manifest_path))
42      {
43        while (my $line2 = <MIN>)
44        {
45          if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
46          {
47            my $protocol = $1;
48            $filepath = 'hdfs:' . $2;
49          }
50        }
51        close(MIN);
52      }
53      else
54      {
55        die('Failed to open manifest for reading: ' . $manifest_path);
56      }
57
58      $data->{$filepath} = {'taskid' => $taskid,
59                            'attempt' => $attempt_no,
60                            'node' => $compute_node,
61                            'splits' => $splits,
62                            'original_splits' => ''};
63      $record_count++;
64    }
65  }
66  close(DLIN);
67  print ' - read ' . $record_count . " records.\n";
68}
69else
70{
71 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
72}
73
74# 2. Read in hadoop log looking for split mapping information
75print " * Read split information: hadoop.log\n";
76if (open(HLIN, '<:utf8', $hadoop_log_path))
77{
78  my $record_count = 0;
79  my $filepath = '';
80  while (my $line = <HLIN>)
81  {
82    if ($line =~ /^Filepath:\s(.+)$/)
83    {
84      $filepath = $1;
85    }
86    elsif ($filepath ne '')
87    {
88      # this is the information that was unintentionally lost
89      if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
90      {
91        my $original_splits = &parseSplits($1);
92        if (!defined $data->{$filepath})
93        {
94          die('Found split information for file not in data locality log: ' . $filepath);
95        }
96        $data->{$filepath}->{'original_splits'} = $original_splits;
97        $filepath = '';
98        $record_count++;
99      }
100    }
101  }
102  close(HLIN);
103  print ' - updated ' . $record_count . " records.\n";
104}
105else
106{
107 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
108}
109
110# 3. Write out the updated data locality file including information about
111#    original splits (should we have found that information) and corrected
112#    data locality flag.
113print " * Writing out updated file: data_locality.csv\n";
114if (open(DLOUT, '>:utf8', $data_locality_csv_path))
115{
116  my $record_count = 0;
117  print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
118  foreach my $filepath (keys %{$data})
119  {
120    my $record = $data->{$filepath};
121    my $compute_node = $record->{'node'};
122    my @dl_nodes = split(',', $record->{'original_splits'});
123    my $data_local = 0;
124    foreach my $dl_node (@dl_nodes)
125    {
126      if ($dl_node eq $compute_node)
127      {
128        $data_local = 1;
129      }
130    }
131    print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
132    $record_count++;
133  }
134  close(DLOUT);
135  print ' - wrote ' . $record_count . " records\n";
136}
137else
138{
139  die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
140}
141
142# 4. Complete
143print "\n========================= Complete =========================\n\n";
144exit;
145
146sub parseSplits
147{
148  my ($raw_splits) = @_;
149  my @splits = split(/\s*,\s*/, $raw_splits);
150  return join(',', natsort(@splits));
151}
152
153sub printDebug
154{
155  my ($msg) = @_;
156  print '[DEBUG] ' . $msg . "\n";
157}
158
159sub printUsage
160{
161  my ($msg) = @_;
162  if (defined $msg)
163  {
164    print 'Error! ' . $msg . "\n";
165  }
166  print 'Usage:  update_data_locality.pl <results directory>' . "\n\n";
167  exit;
168}
Note: See TracBrowser for help on using the browser.