source: gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl

Last change on this file was 28647, checked in by jmt12, 10 years ago

Adding progress messages and making a debug message optional

  • Property svn:executable set to *
File size: 7.2 KB
Line 
1#!/usr/bin/perl
2
3use Cwd;
4use Sort::Key::Natural qw(natsort);
5
6use strict;
7use warnings;
8
9print "=================== Update Data Locality ===================\n";
10print "Update the data locality CSV based upon information found in\n";
11print "the hadoop.log regarding Split transforms (for instance when\n";
12print "attempting to *prevent* data locality)\n";
13print "============================================================\n\n";
14
15our $debug = 0;
16
17# Loop through all the arguments, descending into directories to process data
18# locality files
19my $offset = 0;
20my $found_dirs = 0;
21while (defined $ARGV[$offset])
22{
23 my $argument = $ARGV[$offset];
24 # handle -option arguments?
25 if ($argument =~ /^-(.+)$/)
26 {
27 my $option = $1;
28 if ($option eq 'debug')
29 {
30 $debug = 1;
31 }
32 }
33 # handle directories
34 else
35 {
36 my $path = getcwd() . '/' . $argument;
37 if (-d $path)
38 {
39 $found_dirs++;
40 &searchForDataLocalityFiles($path);
41 }
42 }
43 $offset++;
44}
45if ($found_dirs == 0)
46{
47 &printUsage('Missing results directory');
48}
49
50# Complete!
51print "\n========================= Complete =========================\n\n";
52exit;
53
54################################################################################
55################################# Functions ##################################
56################################################################################
57
58
59## @function searchForDataLocalityFiles()
60#
61sub searchForDataLocalityFiles
62{
63 my ($dir) = @_;
64 # All of the following is *only* valid for directories
65 if (-d $dir)
66 {
67 print ' * Searching for Data Locality information: ' . $dir . "\n";
68 # Does this directory contain data_locality.csv?
69 my $dl_csv_path = $dir . '/data_locality.csv';
70 if (-f $dl_csv_path)
71 {
72 &updateDataLocalityForDirectory($dir);
73 }
74 # Also descend into child directories
75 if (opendir(DIR, $dir))
76 {
77 my @files = readdir(DIR);
78 closedir(DIR);
79 foreach my $file (@files)
80 {
81 my $path = $dir . '/' . $file;
82 if ($file =~ /^\./)
83 {
84 }
85 elsif (-d $path)
86 {
87 &searchForDataLocalityFiles($path);
88 }
89 }
90 }
91 else
92 {
93 die('Error! Failed to open directory for reading: ' . $dir);
94 }
95 }
96}
97## searchForDataLocalityFiles() ##
98
99
100sub updateDataLocalityForDirectory
101{
102 my ($dir_prefix) = @_;
103 # 0. Init
104 my $data = {};
105 my $data_locality_csv_path = $dir_prefix . '/data_locality.csv';
106 my $hadoop_log_path = $dir_prefix . '/hadoop.log';
107
108 # 1. Read in current data locality comma separated file
109 print " * Read current file: data_locality.csv\n";
110 if (open(DLIN, '<:utf8', $data_locality_csv_path))
111 {
112 my $record_count = 0;
113 while (my $line = <DLIN>)
114 {
115 &printDebug($line);
116 # If the line already indicates this csv has been updated (by the
117 # presence of an "OriginalSplits" column) we don't need to do anything
118 # else
119 if ($line =~ /OriginalSplits/)
120 {
121 print " ! File already updated...\n";
122 close(DLIN);
123 return;
124 }
125 if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
126 {
127 my $taskid = $1;
128 my $attempt_no = $2;
129 my $compute_node = $3;
130 my $splits = $4;
131
132 # Use the taskid to look up the manifest file and therein the filepath
133 my $filepath = '';
134 my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
135 &printDebug('searching for manifest file: ' . $manifest_path);
136 if (open(MIN, '<:utf8', $manifest_path))
137 {
138 while (my $line2 = <MIN>)
139 {
140 if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
141 {
142 my $protocol = $1;
143 $filepath = 'hdfs:' . $2;
144 }
145 }
146 close(MIN);
147 }
148 else
149 {
150 die('Failed to open manifest for reading: ' . $manifest_path);
151 }
152 &printDebug('filepath: ' . $filepath);
153 $data->{$filepath} = {'taskid' => $taskid,
154 'attempt' => $attempt_no,
155 'node' => $compute_node,
156 'splits' => $splits,
157 'original_splits' => ''};
158 $record_count++;
159 }
160 }
161 close(DLIN);
162 print ' - read ' . $record_count . " records.\n";
163 }
164 else
165 {
166 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
167 }
168
169 # 2. Read in hadoop log looking for split mapping information
170 print " * Read split information: hadoop.log\n";
171 if (open(HLIN, '<:utf8', $hadoop_log_path))
172 {
173 my $record_count = 0;
174 my $filepath = '';
175 while (my $line = <HLIN>)
176 {
177 if ($line =~ /^Filepath:\s(.+)$/)
178 {
179 $filepath = $1;
180 }
181 elsif ($filepath ne '')
182 {
183 # this is the information that was unintentionally lost
184 if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
185 {
186 my $original_splits = &parseSplits($1);
187 if (!defined $data->{$filepath})
188 {
189 die('Found split information for file not in data locality log: ' . $filepath);
190 }
191 $data->{$filepath}->{'original_splits'} = $original_splits;
192 $filepath = '';
193 $record_count++;
194 }
195 }
196 }
197 close(HLIN);
198 print ' - updated ' . $record_count . " records.\n";
199 }
200 else
201 {
202 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
203 }
204
205 # 3. Write out the updated data locality file including information about
206 # original splits (should we have found that information) and corrected
207 # data locality flag.
208 # backup old
209 my $backup_path = $data_locality_csv_path . '.bak';
210 if (-f $backup_path)
211 {
212 unlink($backup_path);
213 }
214 rename($data_locality_csv_path, $backup_path);
215 # write new
216 print " * Writing out updated file: data_locality.csv\n";
217 if (open(DLOUT, '>:utf8', $data_locality_csv_path))
218 {
219 my $record_count = 0;
220 print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
221 foreach my $filepath (keys %{$data})
222 {
223 my $record = $data->{$filepath};
224 my $compute_node = $record->{'node'};
225 my @dl_nodes = split(',', $record->{'original_splits'});
226 my $data_local = 0;
227 foreach my $dl_node (@dl_nodes)
228 {
229 if ($dl_node eq $compute_node)
230 {
231 $data_local = 1;
232 }
233 }
234 print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
235 $record_count++;
236 }
237 close(DLOUT);
238 print ' - wrote ' . $record_count . " records\n";
239 }
240 else
241 {
242 die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
243 }
244}
245## updateDataLocalityForDirectory() ##
246
247
248## @function parseSplits()
249#
250sub parseSplits
251{
252 my ($raw_splits) = @_;
253 my @splits = split(/\s*,\s*/, $raw_splits);
254 return join(',', natsort(@splits));
255}
256
257sub printDebug
258{
259 my ($msg) = @_;
260 if ($debug)
261 {
262 chomp($msg);
263 print '[DEBUG] ' . $msg . "\n";
264 }
265}
266
267sub printUsage
268{
269 my ($msg) = @_;
270 if (defined $msg)
271 {
272 print 'Error! ' . $msg . "\n";
273 }
274 print 'Usage: update_data_locality.pl <results directory>' . "\n\n";
275 exit;
276}
Note: See TracBrowser for help on using the repository browser.