1 | #!/usr/bin/perl
|
---|
2 |
|
---|
3 | use Cwd;
|
---|
4 | use Sort::Key::Natural qw(natsort);
|
---|
5 |
|
---|
6 | use strict;
|
---|
7 | use warnings;
|
---|
8 |
|
---|
9 | print "=================== Update Data Locality ===================\n";
|
---|
10 | print "Update the data locality CSV based upon information found in\n";
|
---|
11 | print "the hadoop.log regarding Split transforms (for instance when\n";
|
---|
12 | print "attempting to *prevent* data locality)\n";
|
---|
13 | print "============================================================\n\n";
|
---|
14 |
|
---|
15 | our $debug = 0;
|
---|
16 |
|
---|
17 | # Loop through all the arguments, descending into directories to process data
|
---|
18 | # locality files
|
---|
19 | my $offset = 0;
|
---|
20 | my $found_dirs = 0;
|
---|
21 | while (defined $ARGV[$offset])
|
---|
22 | {
|
---|
23 | my $argument = $ARGV[$offset];
|
---|
24 | # handle -option arguments?
|
---|
25 | if ($argument =~ /^-(.+)$/)
|
---|
26 | {
|
---|
27 | my $option = $1;
|
---|
28 | if ($option eq 'debug')
|
---|
29 | {
|
---|
30 | $debug = 1;
|
---|
31 | }
|
---|
32 | }
|
---|
33 | # handle directories
|
---|
34 | else
|
---|
35 | {
|
---|
36 | my $path = getcwd() . '/' . $argument;
|
---|
37 | if (-d $path)
|
---|
38 | {
|
---|
39 | $found_dirs++;
|
---|
40 | &searchForDataLocalityFiles($path);
|
---|
41 | }
|
---|
42 | }
|
---|
43 | $offset++;
|
---|
44 | }
|
---|
45 | if ($found_dirs == 0)
|
---|
46 | {
|
---|
47 | &printUsage('Missing results directory');
|
---|
48 | }
|
---|
49 |
|
---|
50 | # Complete!
|
---|
51 | print "\n========================= Complete =========================\n\n";
|
---|
52 | exit;
|
---|
53 |
|
---|
54 | ################################################################################
|
---|
55 | ################################# Functions ##################################
|
---|
56 | ################################################################################
|
---|
57 |
|
---|
58 |
|
---|
59 | ## @function searchForDataLocalityFiles()
|
---|
60 | #
|
---|
61 | sub searchForDataLocalityFiles
|
---|
62 | {
|
---|
63 | my ($dir) = @_;
|
---|
64 | # All of the following is *only* valid for directories
|
---|
65 | if (-d $dir)
|
---|
66 | {
|
---|
67 | print ' * Searching for Data Locality information: ' . $dir . "\n";
|
---|
68 | # Does this directory contain data_locality.csv?
|
---|
69 | my $dl_csv_path = $dir . '/data_locality.csv';
|
---|
70 | if (-f $dl_csv_path)
|
---|
71 | {
|
---|
72 | &updateDataLocalityForDirectory($dir);
|
---|
73 | }
|
---|
74 | # Also descend into child directories
|
---|
75 | if (opendir(DIR, $dir))
|
---|
76 | {
|
---|
77 | my @files = readdir(DIR);
|
---|
78 | closedir(DIR);
|
---|
79 | foreach my $file (@files)
|
---|
80 | {
|
---|
81 | my $path = $dir . '/' . $file;
|
---|
82 | if ($file =~ /^\./)
|
---|
83 | {
|
---|
84 | }
|
---|
85 | elsif (-d $path)
|
---|
86 | {
|
---|
87 | &searchForDataLocalityFiles($path);
|
---|
88 | }
|
---|
89 | }
|
---|
90 | }
|
---|
91 | else
|
---|
92 | {
|
---|
93 | die('Error! Failed to open directory for reading: ' . $dir);
|
---|
94 | }
|
---|
95 | }
|
---|
96 | }
|
---|
97 | ## searchForDataLocalityFiles() ##
|
---|
98 |
|
---|
99 |
|
---|
100 | sub updateDataLocalityForDirectory
|
---|
101 | {
|
---|
102 | my ($dir_prefix) = @_;
|
---|
103 | # 0. Init
|
---|
104 | my $data = {};
|
---|
105 | my $data_locality_csv_path = $dir_prefix . '/data_locality.csv';
|
---|
106 | my $hadoop_log_path = $dir_prefix . '/hadoop.log';
|
---|
107 |
|
---|
108 | # 1. Read in current data locality comma separated file
|
---|
109 | print " * Read current file: data_locality.csv\n";
|
---|
110 | if (open(DLIN, '<:utf8', $data_locality_csv_path))
|
---|
111 | {
|
---|
112 | my $record_count = 0;
|
---|
113 | while (my $line = <DLIN>)
|
---|
114 | {
|
---|
115 | &printDebug($line);
|
---|
116 | # If the line already indicates this csv has been updated (by the
|
---|
117 | # presence of an "OriginalSplits" column) we don't need to do anything
|
---|
118 | # else
|
---|
119 | if ($line =~ /OriginalSplits/)
|
---|
120 | {
|
---|
121 | print " ! File already updated...\n";
|
---|
122 | close(DLIN);
|
---|
123 | return;
|
---|
124 | }
|
---|
125 | if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
|
---|
126 | {
|
---|
127 | my $taskid = $1;
|
---|
128 | my $attempt_no = $2;
|
---|
129 | my $compute_node = $3;
|
---|
130 | my $splits = $4;
|
---|
131 |
|
---|
132 | # Use the taskid to look up the manifest file and therein the filepath
|
---|
133 | my $filepath = '';
|
---|
134 | my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
|
---|
135 | &printDebug('searching for manifest file: ' . $manifest_path);
|
---|
136 | if (open(MIN, '<:utf8', $manifest_path))
|
---|
137 | {
|
---|
138 | while (my $line2 = <MIN>)
|
---|
139 | {
|
---|
140 | if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
|
---|
141 | {
|
---|
142 | my $protocol = $1;
|
---|
143 | $filepath = 'hdfs:' . $2;
|
---|
144 | }
|
---|
145 | }
|
---|
146 | close(MIN);
|
---|
147 | }
|
---|
148 | else
|
---|
149 | {
|
---|
150 | die('Failed to open manifest for reading: ' . $manifest_path);
|
---|
151 | }
|
---|
152 | &printDebug('filepath: ' . $filepath);
|
---|
153 | $data->{$filepath} = {'taskid' => $taskid,
|
---|
154 | 'attempt' => $attempt_no,
|
---|
155 | 'node' => $compute_node,
|
---|
156 | 'splits' => $splits,
|
---|
157 | 'original_splits' => ''};
|
---|
158 | $record_count++;
|
---|
159 | }
|
---|
160 | }
|
---|
161 | close(DLIN);
|
---|
162 | print ' - read ' . $record_count . " records.\n";
|
---|
163 | }
|
---|
164 | else
|
---|
165 | {
|
---|
166 | die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
|
---|
167 | }
|
---|
168 |
|
---|
169 | # 2. Read in hadoop log looking for split mapping information
|
---|
170 | print " * Read split information: hadoop.log\n";
|
---|
171 | if (open(HLIN, '<:utf8', $hadoop_log_path))
|
---|
172 | {
|
---|
173 | my $record_count = 0;
|
---|
174 | my $filepath = '';
|
---|
175 | while (my $line = <HLIN>)
|
---|
176 | {
|
---|
177 | if ($line =~ /^Filepath:\s(.+)$/)
|
---|
178 | {
|
---|
179 | $filepath = $1;
|
---|
180 | }
|
---|
181 | elsif ($filepath ne '')
|
---|
182 | {
|
---|
183 | # this is the information that was unintentionally lost
|
---|
184 | if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
|
---|
185 | {
|
---|
186 | my $original_splits = &parseSplits($1);
|
---|
187 | if (!defined $data->{$filepath})
|
---|
188 | {
|
---|
189 | die('Found split information for file not in data locality log: ' . $filepath);
|
---|
190 | }
|
---|
191 | $data->{$filepath}->{'original_splits'} = $original_splits;
|
---|
192 | $filepath = '';
|
---|
193 | $record_count++;
|
---|
194 | }
|
---|
195 | }
|
---|
196 | }
|
---|
197 | close(HLIN);
|
---|
198 | print ' - updated ' . $record_count . " records.\n";
|
---|
199 | }
|
---|
200 | else
|
---|
201 | {
|
---|
202 | die('Error! Failed to open file for reading: ' . $hadoop_log_path);
|
---|
203 | }
|
---|
204 |
|
---|
205 | # 3. Write out the updated data locality file including information about
|
---|
206 | # original splits (should we have found that information) and corrected
|
---|
207 | # data locality flag.
|
---|
208 | # backup old
|
---|
209 | my $backup_path = $data_locality_csv_path . '.bak';
|
---|
210 | if (-f $backup_path)
|
---|
211 | {
|
---|
212 | unlink($backup_path);
|
---|
213 | }
|
---|
214 | rename($data_locality_csv_path, $backup_path);
|
---|
215 | # write new
|
---|
216 | print " * Writing out updated file: data_locality.csv\n";
|
---|
217 | if (open(DLOUT, '>:utf8', $data_locality_csv_path))
|
---|
218 | {
|
---|
219 | my $record_count = 0;
|
---|
220 | print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
|
---|
221 | foreach my $filepath (keys %{$data})
|
---|
222 | {
|
---|
223 | my $record = $data->{$filepath};
|
---|
224 | my $compute_node = $record->{'node'};
|
---|
225 | my @dl_nodes = split(',', $record->{'original_splits'});
|
---|
226 | my $data_local = 0;
|
---|
227 | foreach my $dl_node (@dl_nodes)
|
---|
228 | {
|
---|
229 | if ($dl_node eq $compute_node)
|
---|
230 | {
|
---|
231 | $data_local = 1;
|
---|
232 | }
|
---|
233 | }
|
---|
234 | print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
|
---|
235 | $record_count++;
|
---|
236 | }
|
---|
237 | close(DLOUT);
|
---|
238 | print ' - wrote ' . $record_count . " records\n";
|
---|
239 | }
|
---|
240 | else
|
---|
241 | {
|
---|
242 | die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
|
---|
243 | }
|
---|
244 | }
|
---|
245 | ## updateDataLocalityForDirectory() ##
|
---|
246 |
|
---|
247 |
|
---|
248 | ## @function parseSplits()
|
---|
249 | #
|
---|
250 | sub parseSplits
|
---|
251 | {
|
---|
252 | my ($raw_splits) = @_;
|
---|
253 | my @splits = split(/\s*,\s*/, $raw_splits);
|
---|
254 | return join(',', natsort(@splits));
|
---|
255 | }
|
---|
256 |
|
---|
257 | sub printDebug
|
---|
258 | {
|
---|
259 | my ($msg) = @_;
|
---|
260 | if ($debug)
|
---|
261 | {
|
---|
262 | chomp($msg);
|
---|
263 | print '[DEBUG] ' . $msg . "\n";
|
---|
264 | }
|
---|
265 | }
|
---|
266 |
|
---|
267 | sub printUsage
|
---|
268 | {
|
---|
269 | my ($msg) = @_;
|
---|
270 | if (defined $msg)
|
---|
271 | {
|
---|
272 | print 'Error! ' . $msg . "\n";
|
---|
273 | }
|
---|
274 | print 'Usage: update_data_locality.pl <results directory>' . "\n\n";
|
---|
275 | exit;
|
---|
276 | }
|
---|