root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_report.pl @ 27684

Revision 27684, 8.6 KB (checked in by jmt12, 7 years ago)

Adding natural sorting into report generation - so also needed to add INC building code typical of GS scripts

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6# Requires setup.bash to have been sourced
7BEGIN
8{
9  die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
10  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
11  # Ensure Greenstone Perl locations are in INC
12  unshift (@INC, $ENV{'GSDLHOME'} . '/perllib');
13  unshift (@INC, $ENV{'GSDLHOME'} . '/perllib/cpan');
14  # we'll need the perl version number
15  my ($version_number) = `perl-version.pl`;
16  if (defined $ENV{'GSDLEXTS'})
17  {
18    my @extensions = split(/:/,$ENV{'GSDLEXTS'});
19    foreach my $e (@extensions)
20    {
21      my $ext_prefix = $ENV{'GSDLHOME'} . '/ext/' . $e;
22      unshift (@INC, $ext_prefix . '/perllib');
23      unshift (@INC, $ext_prefix . '/perllib/cpan');
24      unshift (@INC, $ext_prefix . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $version_number);
25    }
26  }
27}
28
29use Devel::Peek;
30use Sort::Key::Natural qw(natsort);
31use Time::Local;
32
33print "\n===== GS Hadoop Report =====\n";
34print "Generate a report (CSV) from the output of a parallel processing Green-\n";
35print "stone import, suitable for feeding into the Gantt chart generator.\n";
36print "============================\n\n";
37
38# Configuration
39if (!defined $ARGV[0] || !-d $ARGV[0])
40{
41  die("usage: hadoop_report.pl <path to results>\n");
42}
43my $results_path = $ARGV[0];
44
45# Read in hadoop.log and parse top level record
46print ' * Reading and parsing "hadoop.log"... ';
47my $job_record = {'host'=>'', 'job'=>'', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
48my $hadoop_log_path = $results_path . '/hadoop.log';
49if (open(HLIN, '<:utf8', $hadoop_log_path))
50{
51  while (my $line = <HLIN>)
52  {
53    if ($line =~ /host:(.+)/)
54    {
55      $job_record->{'host'} = $1;
56    }
57    elsif ($line =~ /Running job: job_(\d+_\d+)/)
58    {
59      $job_record->{'job'} = $1;
60    }
61    elsif ($line =~ /CPU time spent \(ms\)=(\d+)/)
62    {
63      $job_record->{'cpu_time'} = $1;
64    }
65    elsif ($job_record->{'start'} == 0 && $line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
66    {
67      $job_record->{'start'} = timelocal($6, $5, $4, $3, ($2 - 1), $1);
68    }
69    elsif ($line =~ /(\d\d)\/(\d\d)\/(\d\d) (\d\d):(\d\d):(\d\d)/)
70    {
71      my $end = timelocal($6, $5, $4, $3, ($2 - 1), $1);
72      if ($end > $job_record->{'end'})
73      {
74        $job_record->{'end'} = $end;
75      }
76    }
77  }
78  close(HLIN);
79  if ($job_record->{'start'} == 0 || $job_record->{'end'} == 0)
80  {
81    die('Error! Failed to parse timing information from log: ' . $hadoop_log_path);
82  }
83}
84else
85{
86  die('Error! Failed to open file for reading: ' . $hadoop_log_path);
87}
88print "Done!\n";
89
90# Read in data_locality.csv (will be matched to task logs)
91print ' * Reading and parsing "data_locality.csv"... ';
92my $data_was_local = {};
93my $data_locality_csv_path = $results_path . '/data_locality.csv';
94if (open(DLIN, '<:utf8', $data_locality_csv_path))
95{
96  while (my $line = <DLIN>)
97  {
98    if ($line =~ /(\d+),\d,(\d)/)
99    {
100      $data_was_local->{$1} = $2;
101    }
102  }
103  close(DLIN);
104}
105else
106{
107  die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
108}
109print "Done!\n";
110
111# Read in all task logs and parse task records
112my $task_records;
113print " * Locating task logs...\n";
114if (opendir(DH, $results_path))
115{
116  my @files = readdir(DH);
117  foreach my $file (sort @files)
118  {
119    if ($file =~ /import-hadoop-(\d+_\d+)_m_(\d+)_\d+\.log/)
120    {
121      my $job_no = $1;
122      my $task_no = $2;
123      my $is_data_local = $data_was_local->{$task_no};
124      my $task_record = {'host'=>'', 'cpu'=>0, 'job' => $job_no, 'task' => $task_no, 'start'=>0, 'end'=>0, 'cpu_time'=>0, 'data_locality'=>$is_data_local, 'file'=>''};
125      print ' - Reading and parsing "' . $file . '"... ';
126      my $task_log_path = $results_path . '/' . $file;
127      my $io_time = 0;
128      if (open(TIN, '<:utf8', $task_log_path))
129      {
130        my $io_start_time = 0;
131        while (my $line = <TIN>)
132        {
133          if ($line =~ /\[Started:(\d+)\]/)
134          {
135            $task_record->{'start'} = $1;
136          }
137          elsif ($line =~ /\[Host:([^\]]+)\]/)
138          {
139            $task_record->{'host'} = $1;
140          }
141          elsif ($line =~ /\[CPU:(\d+)\]/)
142          {
143            $task_record->{'cpu'} = $1;
144          }
145          elsif ($line =~ /\[Map:([^\]]+)=>1\]/)
146          {
147            $task_record->{'file'} = $1;
148          }
149          elsif ($line =~ /\[IOS:(\d+)\]/)
150          {
151            $io_start_time = $1;
152          }
153          elsif ($io_start_time > 0 && $line =~ /\[IOE:(\d+)\]/)
154          {
155            $io_time += ($1 - $io_start_time);
156            $io_start_time = 0;
157          }
158          elsif ($line =~ /\[Completed:(\d+)\]/)
159          {
160            my $end_time = $1;
161            $task_record->{'end'} = $end_time;
162            if ($io_start_time > 0)
163            {
164              $io_time += ($end_time - $io_start_time);
165            }
166          }
167        }
168        close(TIN);
169        # Calculate CPU time (total time - IO time)
170        $task_record->{'cpu_time'} = $task_record->{'end'} - $task_record->{'start'} - $io_time;
171        # Store this record
172        $task_records->{$task_no} = $task_record;
173      }
174      else
175      {
176        die('Error! Failed to open file for reading: ' . $task_log_path);
177      }
178      print "Done!\n";
179    }
180  }
181  close(DH);
182}
183else
184{
185  die('Error! Failed to open directory for reading: ' . $results_path);
186}
187print ' - Processed ' . scalar(keys(%{$task_records})) . " records\n";
188
189# Generate compute-node records
190print ' * Generating compute node records... ';
191my $node_records;
192foreach my $taskno (sort keys %{$task_records})
193{
194  my $task_record = $task_records->{$taskno};
195  my $node_record = {'host'=>'', 'cpu'=>0, 'job' => '', 'start'=>0, 'end'=>0, 'cpu_time'=>0};
196  # - retrieve any existing record
197  my $worker_id = $task_record->{'host'} . '#' . $task_record->{'cpu'};
198  if (defined $node_records->{$worker_id})
199  {
200    $node_record = $node_records->{$worker_id};
201  }
202  if ($node_record->{'host'} eq '')
203  {
204    $node_record->{'host'} = $task_record->{'host'};
205  }
206  if ($node_record->{'cpu'} == 0)
207  {
208    $node_record->{'cpu'} = $task_record->{'cpu'};
209  }
210  if ($node_record->{'job'} eq '')
211  {
212    $node_record->{'job'} = $task_record->{'job'};
213  }
214  if ($node_record->{'start'} == 0 || $task_record->{'start'} < $node_record->{'start'})
215  {
216    $node_record->{'start'} = $task_record->{'start'};
217  }
218  if ($node_record->{'end'} == 0 || $task_record->{'end'} > $node_record->{'end'})
219  {
220    $node_record->{'end'} = $task_record->{'end'};
221  }
222  $node_record->{'cpu_time'} += $task_record->{'cpu_time'};
223  # - store it
224  $node_records->{$worker_id} = $node_record;
225}
226print "Done!\n";
227
228# Write out CSV of all information
229my $report_csv_path = $results_path . '/timing.csv';
230if (open(CSVOUT, '>:utf8', $report_csv_path))
231{
232  my $row_counter = 1;
233  # Header
234  print CSVOUT "id,name,hostname,start,end,cputime,dl,pid,filename\n";
235  # Master Record
236  print CSVOUT $row_counter . ',M0,' . $job_record->{'host'} . ',' . $job_record->{'start'} . ',' . $job_record->{'end'} . ',' . ($job_record->{'cpu_time'} / 1000) . ",0,0,NA\n";
237  $row_counter++;
238  # For each compute node record
239  my $known_workers = {};
240  foreach my $worker_id (natsort(keys(%{$node_records})))
241  {
242    my $node_record = $node_records->{$worker_id};
243    my $node_id = $row_counter;
244    $row_counter++;
245    my $csv_worker_id = 'W' . $node_record->{'cpu'};
246    # Ensure we haven't used this id before - this should never trigger for
247    # multicore CPUs, but will for clusters (as nearly all nodes will report
248    # themselves as 'W0')
249    if (defined $known_workers->{$csv_worker_id})
250    {
251      # Find a different worker id as this one is already in use
252      my $counter = 0;
253      $csv_worker_id = 'W' . $counter;
254      while (defined $known_workers->{$csv_worker_id})
255      {
256        $counter++;
257        $csv_worker_id = 'W' . $counter;
258      }
259    }
260    $known_workers->{$csv_worker_id} = 1;
261    print CSVOUT $node_id . ',' . $csv_worker_id . ',' . $node_record->{'host'} . ',' . $node_record->{'start'} . ',' . $node_record->{'end'} . ',' . $node_record->{'cpu_time'} . ",0,1,NA\n";
262    # List the child task records
263    foreach my $taskno (sort keys %{$task_records})
264    {
265      my $task_record = $task_records->{$taskno};
266      if ($task_record->{'host'} . '#' . $task_record->{'cpu'} eq $worker_id)
267      {
268        print CSVOUT $row_counter . ',T' . ($task_record->{'task'} + 0) . ',' . $task_record->{'host'} . ',' . $task_record->{'start'} . ',' . $task_record->{'end'} . ',' . $task_record->{'cpu_time'} . ',' . $task_record->{'data_locality'} . ',' . $node_id . ',' . $task_record->{'file'} . "\n";
269        $row_counter++;
270      }
271    }
272  }
273  close(CSVOUT);
274}
275else
276{
277  die('Error! Failed to open file for writing: ' . $report_csv_path);
278}
279
280print "Complete!\n\n";
281exit;
282
Note: See TracBrowser for help on using the browser.