source: gs2-extensions/parallel-building/trunk/src/bin/script/parse_task_info_from_hadoop_log.pl@ 27586

Last change on this file since 27586 was 27586, checked in by jmt12, 11 years ago

Updating script to date date of hadoop job into account when searching for the task tracker log to parse in - just in case log has been rolled

  • Property svn:executable set to *
File size: 6.4 KB
Line 
1#!/usr/bin/perl
2
3# Pragma
4use strict;
5use warnings;
6
7# Configuration
8my $debug = 0;
9
10# Requires setup.bash to have been sourced
11BEGIN
12{
13 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
14 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
15 die "HADOOP_PREFIX not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HADOOP_PREFIX'};
16 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
17 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
18 # Ensure Greenstone Perl locations are in INC
19 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
20 unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
21 # we'll need the perl version number
22 my ($version_number) = `perl-version.pl`;
23 if (defined $ENV{'GSDLEXTS'})
24 {
25 my @extensions = split(/:/,$ENV{'GSDLEXTS'});
26 foreach my $e (@extensions)
27 {
28 my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
29 unshift (@INC, $ext_prefix . '/perllib');
30 unshift (@INC, $ext_prefix . '/perllib/cpan');
31 unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
32 }
33 }
34}
35
36# Libraries (depends on unshift above
37use Sort::Key::Natural qw(natsort);
38
39# Begin
40print "===== Parse Hadoop Log =====\n";
41
42# 0. Init
43if (!defined $ARGV[0])
44{
45 die("usage: parse_task_info_from_hadoop_log.pl <results dir>\n");
46}
47my $results_dir = $ARGV[0];
48if (!-d $results_dir)
49{
50 die("Error! Can't find results directory: " . $results_dir . "\n");
51}
52print " Results directory: " . $results_dir . "\n";
53
54# 1. Determine job ID
55my $hadoop_log_path = &fileCat($results_dir, 'hadoop.log');
56if (!-e $hadoop_log_path)
57{
58 die("Error! Hadoop log file cannot be found: " . $hadoop_log_path . "\n");
59}
60print " Hadoop log path: " . $hadoop_log_path . "\n";
61print " * Determine JobID: ";
62my $job_id;
63my $result = `grep "Running job:" "$hadoop_log_path"`;
64if ($result =~ /Running job: job_(\d+_\d+)/)
65{
66 $job_id = $1;
67}
68else
69{
70 die("Error! Failed to locate JobID\n");
71}
72print $job_id . "\n";
73# - we'll need the date to locate the appopriate log file
74my $log_date_suffix = '';
75if ($job_id =~ /^(\d\d\d\d)(\d\d)(\d\d)/)
76{
77 $log_date_suffix = '.' . $1 . '-' . $2 . '-' . $3;
78}
79
80# 2. Determine appropriate job tracker log
81my $username = `whoami`;
82chomp($username);
83print " Username: " . $username . "\n";
84my $hostname = `hostname`;
85chomp($hostname);
86print " Hostname: " . $hostname . "\n";
87my $jobtracker_log_path = &fileCat($ENV{'HADOOP_PREFIX'}, 'logs', 'hadoop-' . $username . '-jobtracker-' . $hostname . '.log' . $log_date_suffix);
88if (!-e $jobtracker_log_path)
89{
90 $jobtracker_log_path = &fileCat($ENV{'HADOOP_PREFIX'}, 'logs', 'hadoop-' . $username . '-jobtracker-' . $hostname . '.log');
91 if (!-e $jobtracker_log_path)
92 {
93 die("Error! Hadoop JobTracker log file cannot be found: " . $jobtracker_log_path . "\n");
94 }
95}
96print " Jobtracker log path: " . $jobtracker_log_path . "\n";
97my $data_locality_report_path = &fileCat($results_dir, 'data_locality.csv');
98print " Report path: " . $data_locality_report_path . "\n";
99
100# 3. Parse log
101print " * Parse JobTracker Log... ";
102my $tid_2_splits = {};
103my $tid_2_node = {};
104my $aid_2_node = {};
105if (open(JTLIN, '<', $jobtracker_log_path))
106{
107 my $line = '';
108 while ($line = <JTLIN>)
109 {
110 # Tips provide a match between task and file splits
111 if ($line =~ /tip:task_${job_id}(_m_\d+) has split on node:\/default-rack\/([^\.\r\n]+)/)
112 {
113 my $task_id = $job_id . $1;
114 my $compute_node = $2;
115 &debugPrint('found tip: ' . $task_id . ' => ' . $compute_node);
116 if (!defined $tid_2_splits->{$task_id})
117 {
118 $tid_2_splits->{$task_id} = [$compute_node];
119 }
120 else
121 {
122 push(@{$tid_2_splits->{$task_id}}, $compute_node);
123 }
124 }
125 # JobTracker (MAP) entries give us a mapping between task, attempt, and
126 # compute node
127 if ($line =~ /Adding task \(MAP\) 'attempt_${job_id}(_m_\d+)(_\d+)'.*tracker_([^\.]+).local/)
128 {
129 my $task_id = $job_id . $1;
130 my $attempt_id = $job_id . $1 . $2;
131 my $compute_node = $3;
132 &debugPrint('found MAP: ' . $attempt_id . ' => ' . $compute_node);
133 $aid_2_node->{$attempt_id} = {'compute_node' => $compute_node,
134 'succeeded' => 0
135 };
136 }
137 # Watch for attempt successes (so we can weed out failures)
138 if ($line =~ /Task 'attempt_${job_id}(_m_\d+_\d+)' has completed .* successfully/)
139 {
140 my $attempt_id = $job_id . $1;
141 &debugPrint('successful attempt: ' . $attempt_id);
142 if (defined $aid_2_node->{$attempt_id})
143 {
144 $aid_2_node->{$attempt_id}->{'succeeded'} = 1;
145 }
146 }
147 }
148 close(JTLIN);
149}
150else
151{
152 die("Error! Failed to open JobTracker log for reading: " . $jobtracker_log_path . "\n");
153}
154print "Done\n";
155
156
157# 4. Write CSV of information
158print " * Writing Job Information... ";
159&debugPrint("\nAttemptID\tComputeNode\tSucceeded");
160foreach my $attempt_id (keys %{$aid_2_node})
161{
162 &debugPrint($attempt_id . "\t" . $aid_2_node->{$attempt_id}->{'compute_node'} . "\t" . $aid_2_node->{$attempt_id}->{'succeeded'});
163}
164&debugPrint("TaskID\tComputeNodeSplits");
165foreach my $task_id (keys %{$tid_2_splits})
166{
167 &debugPrint($task_id . "\t" . join(',', natsort(@{$tid_2_splits->{$task_id}})));
168}
169
170# - open the CSV file and write out the combined information from above
171if (open(CSVOUT, '>:utf8', $data_locality_report_path))
172{
173 print CSVOUT "TaskNo,AttemptNo,Data Local,Compute Node,Splits\n";
174 foreach my $attempt_id (natsort(keys %{$aid_2_node}))
175 {
176 my ($job_id, $task_number, $attempt_number) = $attempt_id =~ /^(\d+_\d+)_m_(\d+)_(\d+)/;
177 my $task_id = $job_id . '_m_' . $task_number;
178 my $compute_node = $aid_2_node->{$attempt_id}->{'compute_node'};
179 my @splits = @{$tid_2_splits->{$task_id}};
180 my $data_local = 0;
181 if (grep($_ eq $compute_node, @splits))
182 {
183 $data_local = 1;
184 }
185 print CSVOUT $task_number . "," . $attempt_number . "," . $data_local . ",\"" . $compute_node . "\",\"" . join(',', natsort(@splits)) . "\"\n";
186 }
187 close(CSVOUT);
188}
189else
190{
191 die("Error! Failed to open file for writing: " . $data_locality_report_path);
192}
193print "Done\n";
194
195# 5. Done
196print "===== Complete! =====\n\n";
197exit;
198
199# Subs
200
201sub debugPrint
202{
203 my ($msg) = @_;
204 if ($debug)
205 {
206 print '[debug] ' . $msg . "\n";
207 }
208}
209
210sub fileCat
211{
212 my $path = join('/', @_);
213 $path =~ s/\/\/+/\//g;
214 return $path;
215}
Note: See TracBrowser for help on using the repository browser.