source: gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl@ 28357

Last change on this file since 28357 was 28357, checked in by jmt12, 11 years ago

used to update the data_locality.csv file in the case where other transforms have been applied to the HDFS file splitting algorimth - with this information extracted from the hadoop.log file

  • Property svn:executable set to *
File size: 4.5 KB
Line 
1#!/usr/bin/perl
2
3use Sort::Key::Natural qw(natsort);
4
5use strict;
6use warnings;
7
8print "=================== Update Data Locality ===================\n";
9print "Update the data locality CSV based upon information found in\n";
10print "the hadoop.log regarding Split transforms (for instance when\n";
11print "attempting to *prevent* data locality)\n";
12print "============================================================\n\n";
13
14if (!defined $ARGV[0] || !-d $ARGV[0])
15{
16 &printUsage('Missing results directory');
17}
18
19# 0. Init
20my $data = {};
21my $data_locality_csv_path = $ARGV[0] . '/data_locality.csv';
22my $hadoop_log_path = $ARGV[0] . '/hadoop.log';
23
24# 1. Read in current data locality comma separated file
25print " * Read current file: data_locality.csv\n";
26if (open(DLIN, '<:utf8', $data_locality_csv_path))
27{
28 my $record_count = 0;
29 while (my $line = <DLIN>)
30 {
31 if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
32 {
33 my $taskid = $1;
34 my $attempt_no = $2;
35 my $compute_node = $3;
36 my $splits = $4;
37
38 # Use the taskid to look up the manifest file and therein the filepath
39 my $filepath = '';
40 my $manifest_path = $ARGV[0] . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
41 if (open(MIN, '<:utf8', $manifest_path))
42 {
43 while (my $line2 = <MIN>)
44 {
45 if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
46 {
47 my $protocol = $1;
48 $filepath = 'hdfs:' . $2;
49 }
50 }
51 close(MIN);
52 }
53 else
54 {
55 die('Failed to open manifest for reading: ' . $manifest_path);
56 }
57
58 $data->{$filepath} = {'taskid' => $taskid,
59 'attempt' => $attempt_no,
60 'node' => $compute_node,
61 'splits' => $splits,
62 'original_splits' => ''};
63 $record_count++;
64 }
65 }
66 close(DLIN);
67 print ' - read ' . $record_count . " records.\n";
68}
69else
70{
71 die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
72}
73
74# 2. Read in hadoop log looking for split mapping information
75print " * Read split information: hadoop.log\n";
76if (open(HLIN, '<:utf8', $hadoop_log_path))
77{
78 my $record_count = 0;
79 my $filepath = '';
80 while (my $line = <HLIN>)
81 {
82 if ($line =~ /^Filepath:\s(.+)$/)
83 {
84 $filepath = $1;
85 }
86 elsif ($filepath ne '')
87 {
88 # this is the information that was unintentionally lost
89 if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
90 {
91 my $original_splits = &parseSplits($1);
92 if (!defined $data->{$filepath})
93 {
94 die('Found split information for file not in data locality log: ' . $filepath);
95 }
96 $data->{$filepath}->{'original_splits'} = $original_splits;
97 $filepath = '';
98 $record_count++;
99 }
100 }
101 }
102 close(HLIN);
103 print ' - updated ' . $record_count . " records.\n";
104}
105else
106{
107 die('Error! Failed to open file for reading: ' . $hadoop_log_path);
108}
109
110# 3. Write out the updated data locality file including information about
111# original splits (should we have found that information) and corrected
112# data locality flag.
113print " * Writing out updated file: data_locality.csv\n";
114if (open(DLOUT, '>:utf8', $data_locality_csv_path))
115{
116 my $record_count = 0;
117 print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
118 foreach my $filepath (keys %{$data})
119 {
120 my $record = $data->{$filepath};
121 my $compute_node = $record->{'node'};
122 my @dl_nodes = split(',', $record->{'original_splits'});
123 my $data_local = 0;
124 foreach my $dl_node (@dl_nodes)
125 {
126 if ($dl_node eq $compute_node)
127 {
128 $data_local = 1;
129 }
130 }
131 print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
132 $record_count++;
133 }
134 close(DLOUT);
135 print ' - wrote ' . $record_count . " records\n";
136}
137else
138{
139 die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
140}
141
142# 4. Complete
143print "\n========================= Complete =========================\n\n";
144exit;
145
146sub parseSplits
147{
148 my ($raw_splits) = @_;
149 my @splits = split(/\s*,\s*/, $raw_splits);
150 return join(',', natsort(@splits));
151}
152
153sub printDebug
154{
155 my ($msg) = @_;
156 print '[DEBUG] ' . $msg . "\n";
157}
158
159sub printUsage
160{
161 my ($msg) = @_;
162 if (defined $msg)
163 {
164 print 'Error! ' . $msg . "\n";
165 }
166 print 'Usage: update_data_locality.pl <results directory>' . "\n\n";
167 exit;
168}
Note: See TracBrowser for help on using the repository browser.