root/gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl @ 28647

Revision 28647, 7.2 KB (checked in by jmt12, 7 years ago)

Adding progress messages and making a debug message optional

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2
3use Cwd;
4use Sort::Key::Natural qw(natsort);
5
6use strict;
7use warnings;
8
9print "=================== Update Data Locality ===================\n";
10print "Update the data locality CSV based upon information found in\n";
11print "the hadoop.log regarding Split transforms (for instance when\n";
12print "attempting to *prevent* data locality)\n";
13print "============================================================\n\n";
14
15our $debug = 0;
16
17# Loop through all the arguments, descending into directories to process data
18# locality files
19my $offset = 0;
20my $found_dirs = 0;
21while (defined $ARGV[$offset])
22{
23  my $argument = $ARGV[$offset];
24  # handle -option arguments?
25  if ($argument =~ /^-(.+)$/)
26  {
27    my $option = $1;
28    if ($option eq 'debug')
29    {
30      $debug = 1;
31    }
32  }
33  # handle directories
34  else
35  {
36    my $path = getcwd() . '/' . $argument;
37    if (-d $path)
38    {
39      $found_dirs++;
40      &searchForDataLocalityFiles($path);
41    }
42  }
43  $offset++;
44}
45if ($found_dirs == 0)
46{
47  &printUsage('Missing results directory');
48}
49
50# Complete!
51print "\n========================= Complete =========================\n\n";
52exit;
53
54################################################################################
55#################################  Functions  ##################################
56################################################################################
57
58
59## @function searchForDataLocalityFiles()
60#
61sub searchForDataLocalityFiles
62{
63  my ($dir) = @_;
64  # All of the following is *only* valid for directories
65  if (-d $dir)
66  {
67    print ' * Searching for Data Locality information: ' . $dir . "\n";
68    # Does this directory contain data_locality.csv?
69    my $dl_csv_path = $dir . '/data_locality.csv';
70    if (-f $dl_csv_path)
71    {
72      &updateDataLocalityForDirectory($dir);
73    }
74    # Also descend into child directories
75    if (opendir(DIR, $dir))
76    {
77      my @files = readdir(DIR);
78      closedir(DIR);
79      foreach my $file (@files)
80      {
81        my $path = $dir . '/' . $file;
82        if ($file =~ /^\./)
83        {
84        }
85        elsif (-d $path)
86        {
87          &searchForDataLocalityFiles($path);
88        }
89      }
90    }
91    else
92    {
93      die('Error! Failed to open directory for reading: ' . $dir);
94    }
95  }
96}
97## searchForDataLocalityFiles() ##
98
99
100sub updateDataLocalityForDirectory
101{
102  my ($dir_prefix) = @_;
103  # 0. Init
104  my $data = {};
105  my $data_locality_csv_path = $dir_prefix . '/data_locality.csv';
106  my $hadoop_log_path = $dir_prefix . '/hadoop.log';
107
108  # 1. Read in current data locality comma separated file
109  print "   * Read current file: data_locality.csv\n";
110  if (open(DLIN, '<:utf8', $data_locality_csv_path))
111  {
112    my $record_count = 0;
113    while (my $line = <DLIN>)
114    {
115      &printDebug($line);
116      # If the line already indicates this csv has been updated (by the
117      # presence of an "OriginalSplits" column) we don't need to do anything
118      # else
119      if ($line =~ /OriginalSplits/)
120      {
121        print "   ! File already updated...\n";
122        close(DLIN);
123        return;
124      }
125      if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
126      {
127        my $taskid       = $1;
128        my $attempt_no   = $2;
129        my $compute_node = $3;
130        my $splits       = $4;
131
132        # Use the taskid to look up the manifest file and therein the filepath
133        my $filepath = '';
134        my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
135        &printDebug('searching for manifest file: ' . $manifest_path);
136        if (open(MIN, '<:utf8', $manifest_path))
137        {
138          while (my $line2 = <MIN>)
139          {
140            if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
141            {
142              my $protocol = $1;
143              $filepath = 'hdfs:' . $2;
144            }
145          }
146          close(MIN);
147        }
148        else
149        {
150          die('Failed to open manifest for reading: ' . $manifest_path);
151        }
152        &printDebug('filepath: ' . $filepath);
153        $data->{$filepath} = {'taskid' => $taskid,
154                              'attempt' => $attempt_no,
155                              'node' => $compute_node,
156                              'splits' => $splits,
157                              'original_splits' => ''};
158        $record_count++;
159      }
160    }
161    close(DLIN);
162    print '   - read ' . $record_count . " records.\n";
163  }
164  else
165  {
166    die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
167  }
168
169  # 2. Read in hadoop log looking for split mapping information
170  print "   * Read split information: hadoop.log\n";
171  if (open(HLIN, '<:utf8', $hadoop_log_path))
172  {
173    my $record_count = 0;
174    my $filepath = '';
175    while (my $line = <HLIN>)
176    {
177      if ($line =~ /^Filepath:\s(.+)$/)
178      {
179        $filepath = $1;
180      }
181      elsif ($filepath ne '')
182      {
183        # this is the information that was unintentionally lost
184        if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
185        {
186          my $original_splits = &parseSplits($1);
187          if (!defined $data->{$filepath})
188          {
189            die('Found split information for file not in data locality log: ' . $filepath);
190          }
191          $data->{$filepath}->{'original_splits'} = $original_splits;
192          $filepath = '';
193          $record_count++;
194        }
195      }
196    }
197    close(HLIN);
198    print '   - updated ' . $record_count . " records.\n";
199  }
200  else
201  {
202    die('Error! Failed to open file for reading: ' . $hadoop_log_path);
203  }
204
205  # 3. Write out the updated data locality file including information about
206  #    original splits (should we have found that information) and corrected
207  #    data locality flag.
208  # backup old
209  my $backup_path = $data_locality_csv_path . '.bak';
210  if (-f $backup_path)
211  {
212    unlink($backup_path);
213  }
214  rename($data_locality_csv_path, $backup_path);
215  # write new
216  print "   * Writing out updated file: data_locality.csv\n";
217  if (open(DLOUT, '>:utf8', $data_locality_csv_path))
218  {
219    my $record_count = 0;
220    print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
221    foreach my $filepath (keys %{$data})
222    {
223      my $record = $data->{$filepath};
224      my $compute_node = $record->{'node'};
225      my @dl_nodes = split(',', $record->{'original_splits'});
226      my $data_local = 0;
227      foreach my $dl_node (@dl_nodes)
228      {
229        if ($dl_node eq $compute_node)
230        {
231          $data_local = 1;
232        }
233      }
234      print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
235      $record_count++;
236    }
237    close(DLOUT);
238    print '   - wrote ' . $record_count . " records\n";
239  }
240  else
241  {
242    die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
243  }
244}
245## updateDataLocalityForDirectory() ##
246
247
248## @function parseSplits()
249#
250sub parseSplits
251{
252  my ($raw_splits) = @_;
253  my @splits = split(/\s*,\s*/, $raw_splits);
254  return join(',', natsort(@splits));
255}
256
257sub printDebug
258{
259  my ($msg) = @_;
260  if ($debug)
261  {
262    chomp($msg);
263    print '[DEBUG] ' . $msg . "\n";
264  }
265}
266
267sub printUsage
268{
269  my ($msg) = @_;
270  if (defined $msg)
271  {
272    print 'Error! ' . $msg . "\n";
273  }
274  print 'Usage:  update_data_locality.pl <results directory>' . "\n\n";
275  exit;
276}
Note: See TracBrowser for help on using the browser.