source: gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl@ 28768

Last change on this file since 28768 was 28647, checked in by jmt12, 11 years ago

Adding progress messages and making a debug message optional

  • Property svn:executable set to *
File size: 7.2 KB
RevLine 
[28357]1#!/usr/bin/perl
2
[28647]3use Cwd;
[28357]4use Sort::Key::Natural qw(natsort);
5
6use strict;
7use warnings;
8
9print "=================== Update Data Locality ===================\n";
10print "Update the data locality CSV based upon information found in\n";
11print "the hadoop.log regarding Split transforms (for instance when\n";
12print "attempting to *prevent* data locality)\n";
13print "============================================================\n\n";
14
[28647]15our $debug = 0;
16
17# Loop through all the arguments, descending into directories to process data
18# locality files
19my $offset = 0;
20my $found_dirs = 0;
21while (defined $ARGV[$offset])
[28357]22{
[28647]23 my $argument = $ARGV[$offset];
24 # handle -option arguments?
25 if ($argument =~ /^-(.+)$/)
26 {
27 my $option = $1;
28 if ($option eq 'debug')
29 {
30 $debug = 1;
31 }
32 }
33 # handle directories
34 else
35 {
36 my $path = getcwd() . '/' . $argument;
37 if (-d $path)
38 {
39 $found_dirs++;
40 &searchForDataLocalityFiles($path);
41 }
42 }
43 $offset++;
44}
45if ($found_dirs == 0)
46{
[28357]47 &printUsage('Missing results directory');
48}
49
[28647]50# Complete!
51print "\n========================= Complete =========================\n\n";
52exit;
[28357]53
[28647]54################################################################################
55################################# Functions ##################################
56################################################################################
57
58
59## @function searchForDataLocalityFiles()
60#
61sub searchForDataLocalityFiles
[28357]62{
[28647]63 my ($dir) = @_;
64 # All of the following is *only* valid for directories
65 if (-d $dir)
[28357]66 {
[28647]67 print ' * Searching for Data Locality information: ' . $dir . "\n";
68 # Does this directory contain data_locality.csv?
69 my $dl_csv_path = $dir . '/data_locality.csv';
70 if (-f $dl_csv_path)
[28357]71 {
[28647]72 &updateDataLocalityForDirectory($dir);
73 }
74 # Also descend into child directories
75 if (opendir(DIR, $dir))
76 {
77 my @files = readdir(DIR);
78 closedir(DIR);
79 foreach my $file (@files)
[28357]80 {
[28647]81 my $path = $dir . '/' . $file;
82 if ($file =~ /^\./)
[28357]83 {
84 }
[28647]85 elsif (-d $path)
86 {
87 &searchForDataLocalityFiles($path);
88 }
[28357]89 }
90 }
[28647]91 else
92 {
93 die('Error! Failed to open directory for reading: ' . $dir);
94 }
[28357]95 }
96}
[28647]97## searchForDataLocalityFiles() ##
98
99
100sub updateDataLocalityForDirectory
[28357]101{
[28647]102 my ($dir_prefix) = @_;
103 # 0. Init
104 my $data = {};
105 my $data_locality_csv_path = $dir_prefix . '/data_locality.csv';
106 my $hadoop_log_path = $dir_prefix . '/hadoop.log';
[28357]107
[28647]108 # 1. Read in current data locality comma separated file
109 print " * Read current file: data_locality.csv\n";
110 if (open(DLIN, '<:utf8', $data_locality_csv_path))
[28357]111 {
[28647]112 my $record_count = 0;
113 while (my $line = <DLIN>)
[28357]114 {
[28647]115 &printDebug($line);
116 # If the line already indicates this csv has been updated (by the
117 # presence of an "OriginalSplits" column) we don't need to do anything
118 # else
119 if ($line =~ /OriginalSplits/)
120 {
121 print " ! File already updated...\n";
122 close(DLIN);
123 return;
124 }
125 if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
126 {
127 my $taskid = $1;
128 my $attempt_no = $2;
129 my $compute_node = $3;
130 my $splits = $4;
131
132 # Use the taskid to look up the manifest file and therein the filepath
133 my $filepath = '';
134 my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
135 &printDebug('searching for manifest file: ' . $manifest_path);
136 if (open(MIN, '<:utf8', $manifest_path))
137 {
138 while (my $line2 = <MIN>)
139 {
140 if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
141 {
142 my $protocol = $1;
143 $filepath = 'hdfs:' . $2;
144 }
145 }
146 close(MIN);
147 }
148 else
149 {
150 die('Failed to open manifest for reading: ' . $manifest_path);
151 }
152 &printDebug('filepath: ' . $filepath);
153 $data->{$filepath} = {'taskid' => $taskid,
154 'attempt' => $attempt_no,
155 'node' => $compute_node,
156 'splits' => $splits,
157 'original_splits' => ''};
158 $record_count++;
159 }
[28357]160 }
[28647]161 close(DLIN);
162 print ' - read ' . $record_count . " records.\n";
163 }
164 else
165 {
166 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
167 }
168
169 # 2. Read in hadoop log looking for split mapping information
170 print " * Read split information: hadoop.log\n";
171 if (open(HLIN, '<:utf8', $hadoop_log_path))
172 {
173 my $record_count = 0;
174 my $filepath = '';
175 while (my $line = <HLIN>)
[28357]176 {
[28647]177 if ($line =~ /^Filepath:\s(.+)$/)
[28357]178 {
[28647]179 $filepath = $1;
180 }
181 elsif ($filepath ne '')
182 {
183 # this is the information that was unintentionally lost
184 if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
[28357]185 {
[28647]186 my $original_splits = &parseSplits($1);
187 if (!defined $data->{$filepath})
188 {
189 die('Found split information for file not in data locality log: ' . $filepath);
190 }
191 $data->{$filepath}->{'original_splits'} = $original_splits;
192 $filepath = '';
193 $record_count++;
[28357]194 }
195 }
196 }
[28647]197 close(HLIN);
198 print ' - updated ' . $record_count . " records.\n";
[28357]199 }
[28647]200 else
201 {
202 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
203 }
[28357]204
[28647]205 # 3. Write out the updated data locality file including information about
206 # original splits (should we have found that information) and corrected
207 # data locality flag.
208 # backup old
209 my $backup_path = $data_locality_csv_path . '.bak';
210 if (-f $backup_path)
[28357]211 {
[28647]212 unlink($backup_path);
213 }
214 rename($data_locality_csv_path, $backup_path);
215 # write new
216 print " * Writing out updated file: data_locality.csv\n";
217 if (open(DLOUT, '>:utf8', $data_locality_csv_path))
218 {
219 my $record_count = 0;
220 print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
221 foreach my $filepath (keys %{$data})
[28357]222 {
[28647]223 my $record = $data->{$filepath};
224 my $compute_node = $record->{'node'};
225 my @dl_nodes = split(',', $record->{'original_splits'});
226 my $data_local = 0;
227 foreach my $dl_node (@dl_nodes)
[28357]228 {
[28647]229 if ($dl_node eq $compute_node)
230 {
231 $data_local = 1;
232 }
[28357]233 }
[28647]234 print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
235 $record_count++;
[28357]236 }
[28647]237 close(DLOUT);
238 print ' - wrote ' . $record_count . " records\n";
[28357]239 }
[28647]240 else
241 {
242 die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
243 }
[28357]244}
[28647]245## updateDataLocalityForDirectory() ##
[28357]246
247
[28647]248## @function parseSplits()
249#
[28357]250sub parseSplits
251{
252 my ($raw_splits) = @_;
253 my @splits = split(/\s*,\s*/, $raw_splits);
254 return join(',', natsort(@splits));
255}
256
257sub printDebug
258{
259 my ($msg) = @_;
[28647]260 if ($debug)
261 {
262 chomp($msg);
263 print '[DEBUG] ' . $msg . "\n";
264 }
[28357]265}
266
267sub printUsage
268{
269 my ($msg) = @_;
270 if (defined $msg)
271 {
272 print 'Error! ' . $msg . "\n";
273 }
274 print 'Usage: update_data_locality.pl <results directory>' . "\n\n";
275 exit;
276}
Note: See TracBrowser for help on using the repository browser.