Changeset 28647 for gs2-extensions
- Timestamp:
- 2013-11-20T12:49:26+13:00 (10 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl
r28357 r28647 1 1 #!/usr/bin/perl 2 2 3 use Cwd; 3 4 use Sort::Key::Natural qw(natsort); 4 5 … … 12 13 print "============================================================\n\n"; 13 14 14 if (!defined $ARGV[0] || !-d $ARGV[0]) 15 our $debug = 0; 16 17 # Loop through all the arguments, descending into directories to process data 18 # locality files 19 my $offset = 0; 20 my $found_dirs = 0; 21 while (defined $ARGV[$offset]) 22 { 23 my $argument = $ARGV[$offset]; 24 # handle -option arguments? 25 if ($argument =~ /^-(.+)$/) 26 { 27 my $option = $1; 28 if ($option eq 'debug') 29 { 30 $debug = 1; 31 } 32 } 33 # handle directories 34 else 35 { 36 my $path = getcwd() . '/' . $argument; 37 if (-d $path) 38 { 39 $found_dirs++; 40 &searchForDataLocalityFiles($path); 41 } 42 } 43 $offset++; 44 } 45 if ($found_dirs == 0) 15 46 { 16 47 &printUsage('Missing results directory'); 17 48 } 18 49 19 # 0. Init 20 my $data = {}; 21 my $data_locality_csv_path = $ARGV[0] . '/data_locality.csv'; 22 my $hadoop_log_path = $ARGV[0] . '/hadoop.log'; 23 24 # 1. Read in current data locality comma separated file 25 print " * Read current file: data_locality.csv\n"; 26 if (open(DLIN, '<:utf8', $data_locality_csv_path)) 27 { 28 my $record_count = 0; 29 while (my $line = <DLIN>) 30 { 31 if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/) 32 { 33 my $taskid = $1; 34 my $attempt_no = $2; 35 my $compute_node = $3; 36 my $splits = $4; 37 38 # Use the taskid to look up the manifest file and therein the filepath 39 my $filepath = ''; 40 my $manifest_path = $ARGV[0] . '/manifest' . $taskid . '_' . $attempt_no . '.xml'; 41 if (open(MIN, '<:utf8', $manifest_path)) 42 { 43 while (my $line2 = <MIN>) 44 { 45 if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/) 46 { 47 my $protocol = $1; 48 $filepath = 'hdfs:' . $2; 49 } 50 } 51 close(MIN); 52 } 53 else 54 { 55 die('Failed to open manifest for reading: ' . $manifest_path); 56 } 57 58 $data->{$filepath} = {'taskid' => $taskid, 59 'attempt' => $attempt_no, 60 'node' => $compute_node, 61 'splits' => $splits, 62 'original_splits' => ''}; 63 $record_count++; 64 } 65 } 66 close(DLIN); 67 print ' - read ' . $record_count . " records.\n"; 68 } 69 else 70 { 71 die('Error! Failed to open file for reading: ' . $data_locality_csv_path); 72 } 73 74 # 2. Read in hadoop log looking for split mapping information 75 print " * Read split information: hadoop.log\n"; 76 if (open(HLIN, '<:utf8', $hadoop_log_path)) 77 { 78 my $record_count = 0; 79 my $filepath = ''; 80 while (my $line = <HLIN>) 81 { 82 if ($line =~ /^Filepath:\s(.+)$/) 83 { 84 $filepath = $1; 85 } 86 elsif ($filepath ne '') 87 { 88 # this is the information that was unintentionally lost 89 if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/) 90 { 91 my $original_splits = &parseSplits($1); 92 if (!defined $data->{$filepath}) 93 { 94 die('Found split information for file not in data locality log: ' . $filepath); 95 } 96 $data->{$filepath}->{'original_splits'} = $original_splits; 97 $filepath = ''; 98 $record_count++; 99 } 100 } 101 } 102 close(HLIN); 103 print ' - updated ' . $record_count . " records.\n"; 104 } 105 else 106 { 107 die('Error! Failed to open file for reading: ' . $hadoop_log_path); 108 } 109 110 # 3. Write out the updated data locality file including information about 111 # original splits (should we have found that information) and corrected 112 # data locality flag. 113 print " * Writing out updated file: data_locality.csv\n"; 114 if (open(DLOUT, '>:utf8', $data_locality_csv_path)) 115 { 116 my $record_count = 0; 117 print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n"; 118 foreach my $filepath (keys %{$data}) 119 { 120 my $record = $data->{$filepath}; 121 my $compute_node = $record->{'node'}; 122 my @dl_nodes = split(',', $record->{'original_splits'}); 123 my $data_local = 0; 124 foreach my $dl_node (@dl_nodes) 125 { 126 if ($dl_node eq $compute_node) 127 { 128 $data_local = 1; 129 } 130 } 131 print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n"; 132 $record_count++; 133 } 134 close(DLOUT); 135 print ' - wrote ' . $record_count . " records\n"; 136 } 137 else 138 { 139 die('Error! Failed to open file for writing: ' . $data_locality_csv_path); 140 } 141 142 # 4. Complete 50 # Complete! 143 51 print "\n========================= Complete =========================\n\n"; 144 52 exit; 145 53 54 ################################################################################ 55 ################################# Functions ################################## 56 ################################################################################ 57 58 59 ## @function searchForDataLocalityFiles() 60 # 61 sub searchForDataLocalityFiles 62 { 63 my ($dir) = @_; 64 # All of the following is *only* valid for directories 65 if (-d $dir) 66 { 67 print ' * Searching for Data Locality information: ' . $dir . "\n"; 68 # Does this directory contain data_locality.csv? 69 my $dl_csv_path = $dir . '/data_locality.csv'; 70 if (-f $dl_csv_path) 71 { 72 &updateDataLocalityForDirectory($dir); 73 } 74 # Also descend into child directories 75 if (opendir(DIR, $dir)) 76 { 77 my @files = readdir(DIR); 78 closedir(DIR); 79 foreach my $file (@files) 80 { 81 my $path = $dir . '/' . $file; 82 if ($file =~ /^\./) 83 { 84 } 85 elsif (-d $path) 86 { 87 &searchForDataLocalityFiles($path); 88 } 89 } 90 } 91 else 92 { 93 die('Error! Failed to open directory for reading: ' . $dir); 94 } 95 } 96 } 97 ## searchForDataLocalityFiles() ## 98 99 100 sub updateDataLocalityForDirectory 101 { 102 my ($dir_prefix) = @_; 103 # 0. Init 104 my $data = {}; 105 my $data_locality_csv_path = $dir_prefix . '/data_locality.csv'; 106 my $hadoop_log_path = $dir_prefix . '/hadoop.log'; 107 108 # 1. Read in current data locality comma separated file 109 print " * Read current file: data_locality.csv\n"; 110 if (open(DLIN, '<:utf8', $data_locality_csv_path)) 111 { 112 my $record_count = 0; 113 while (my $line = <DLIN>) 114 { 115 &printDebug($line); 116 # If the line already indicates this csv has been updated (by the 117 # presence of an "OriginalSplits" column) we don't need to do anything 118 # else 119 if ($line =~ /OriginalSplits/) 120 { 121 print " ! File already updated...\n"; 122 close(DLIN); 123 return; 124 } 125 if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/) 126 { 127 my $taskid = $1; 128 my $attempt_no = $2; 129 my $compute_node = $3; 130 my $splits = $4; 131 132 # Use the taskid to look up the manifest file and therein the filepath 133 my $filepath = ''; 134 my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml'; 135 &printDebug('searching for manifest file: ' . $manifest_path); 136 if (open(MIN, '<:utf8', $manifest_path)) 137 { 138 while (my $line2 = <MIN>) 139 { 140 if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/) 141 { 142 my $protocol = $1; 143 $filepath = 'hdfs:' . $2; 144 } 145 } 146 close(MIN); 147 } 148 else 149 { 150 die('Failed to open manifest for reading: ' . $manifest_path); 151 } 152 &printDebug('filepath: ' . $filepath); 153 $data->{$filepath} = {'taskid' => $taskid, 154 'attempt' => $attempt_no, 155 'node' => $compute_node, 156 'splits' => $splits, 157 'original_splits' => ''}; 158 $record_count++; 159 } 160 } 161 close(DLIN); 162 print ' - read ' . $record_count . " records.\n"; 163 } 164 else 165 { 166 die('Error! Failed to open file for reading: ' . $data_locality_csv_path); 167 } 168 169 # 2. Read in hadoop log looking for split mapping information 170 print " * Read split information: hadoop.log\n"; 171 if (open(HLIN, '<:utf8', $hadoop_log_path)) 172 { 173 my $record_count = 0; 174 my $filepath = ''; 175 while (my $line = <HLIN>) 176 { 177 if ($line =~ /^Filepath:\s(.+)$/) 178 { 179 $filepath = $1; 180 } 181 elsif ($filepath ne '') 182 { 183 # this is the information that was unintentionally lost 184 if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/) 185 { 186 my $original_splits = &parseSplits($1); 187 if (!defined $data->{$filepath}) 188 { 189 die('Found split information for file not in data locality log: ' . $filepath); 190 } 191 $data->{$filepath}->{'original_splits'} = $original_splits; 192 $filepath = ''; 193 $record_count++; 194 } 195 } 196 } 197 close(HLIN); 198 print ' - updated ' . $record_count . " records.\n"; 199 } 200 else 201 { 202 die('Error! Failed to open file for reading: ' . $hadoop_log_path); 203 } 204 205 # 3. Write out the updated data locality file including information about 206 # original splits (should we have found that information) and corrected 207 # data locality flag. 208 # backup old 209 my $backup_path = $data_locality_csv_path . '.bak'; 210 if (-f $backup_path) 211 { 212 unlink($backup_path); 213 } 214 rename($data_locality_csv_path, $backup_path); 215 # write new 216 print " * Writing out updated file: data_locality.csv\n"; 217 if (open(DLOUT, '>:utf8', $data_locality_csv_path)) 218 { 219 my $record_count = 0; 220 print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n"; 221 foreach my $filepath (keys %{$data}) 222 { 223 my $record = $data->{$filepath}; 224 my $compute_node = $record->{'node'}; 225 my @dl_nodes = split(',', $record->{'original_splits'}); 226 my $data_local = 0; 227 foreach my $dl_node (@dl_nodes) 228 { 229 if ($dl_node eq $compute_node) 230 { 231 $data_local = 1; 232 } 233 } 234 print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n"; 235 $record_count++; 236 } 237 close(DLOUT); 238 print ' - wrote ' . $record_count . " records\n"; 239 } 240 else 241 { 242 die('Error! Failed to open file for writing: ' . $data_locality_csv_path); 243 } 244 } 245 ## updateDataLocalityForDirectory() ## 246 247 248 ## @function parseSplits() 249 # 146 250 sub parseSplits 147 251 { … … 154 258 { 155 259 my ($msg) = @_; 156 print '[DEBUG] ' . $msg . "\n"; 260 if ($debug) 261 { 262 chomp($msg); 263 print '[DEBUG] ' . $msg . "\n"; 264 } 157 265 } 158 266
Note:
See TracChangeset
for help on using the changeset viewer.