Ignore:
Timestamp:
2013-11-20T12:49:26+13:00 (10 years ago)
Author:
jmt12
Message:

Adding progress messages and making a debug message optional

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl

    r28357 r28647  
    11#!/usr/bin/perl
    22
     3use Cwd;
    34use Sort::Key::Natural qw(natsort);
    45
     
    1213print "============================================================\n\n";
    1314
    14 if (!defined $ARGV[0] || !-d $ARGV[0])
     15our $debug = 0;
     16
     17# Loop through all the arguments, descending into directories to process data
     18# locality files
     19my $offset = 0;
     20my $found_dirs = 0;
     21while (defined $ARGV[$offset])
     22{
     23  my $argument = $ARGV[$offset];
     24  # handle -option arguments?
     25  if ($argument =~ /^-(.+)$/)
     26  {
     27    my $option = $1;
     28    if ($option eq 'debug')
     29    {
     30      $debug = 1;
     31    }
     32  }
     33  # handle directories
     34  else
     35  {
     36    my $path = getcwd() . '/' . $argument;
     37    if (-d $path)
     38    {
     39      $found_dirs++;
     40      &searchForDataLocalityFiles($path);
     41    }
     42  }
     43  $offset++;
     44}
     45if ($found_dirs == 0)
    1546{
    1647  &printUsage('Missing results directory');
    1748}
    1849
    19 # 0. Init
    20 my $data = {};
    21 my $data_locality_csv_path = $ARGV[0] . '/data_locality.csv';
    22 my $hadoop_log_path = $ARGV[0] . '/hadoop.log';
    23 
    24 # 1. Read in current data locality comma separated file
    25 print " * Read current file: data_locality.csv\n";
    26 if (open(DLIN, '<:utf8', $data_locality_csv_path))
    27 {
    28   my $record_count = 0;
    29   while (my $line = <DLIN>)
    30   {
    31     if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
    32     {
    33       my $taskid       = $1;
    34       my $attempt_no   = $2;
    35       my $compute_node = $3;
    36       my $splits       = $4;
    37 
    38       # Use the taskid to look up the manifest file and therein the filepath
    39       my $filepath = '';
    40       my $manifest_path = $ARGV[0] . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
    41       if (open(MIN, '<:utf8', $manifest_path))
    42       {
    43         while (my $line2 = <MIN>)
    44         {
    45           if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
    46           {
    47             my $protocol = $1;
    48             $filepath = 'hdfs:' . $2;
    49           }
    50         }
    51         close(MIN);
    52       }
    53       else
    54       {
    55         die('Failed to open manifest for reading: ' . $manifest_path);
    56       }
    57 
    58       $data->{$filepath} = {'taskid' => $taskid,
    59                             'attempt' => $attempt_no,
    60                             'node' => $compute_node,
    61                             'splits' => $splits,
    62                             'original_splits' => ''};
    63       $record_count++;
    64     }
    65   }
    66   close(DLIN);
    67   print ' - read ' . $record_count . " records.\n";
    68 }
    69 else
    70 {
    71  die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
    72 }
    73 
    74 # 2. Read in hadoop log looking for split mapping information
    75 print " * Read split information: hadoop.log\n";
    76 if (open(HLIN, '<:utf8', $hadoop_log_path))
    77 {
    78   my $record_count = 0;
    79   my $filepath = '';
    80   while (my $line = <HLIN>)
    81   {
    82     if ($line =~ /^Filepath:\s(.+)$/)
    83     {
    84       $filepath = $1;
    85     }
    86     elsif ($filepath ne '')
    87     {
    88       # this is the information that was unintentionally lost
    89       if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
    90       {
    91         my $original_splits = &parseSplits($1);
    92         if (!defined $data->{$filepath})
    93         {
    94           die('Found split information for file not in data locality log: ' . $filepath);
    95         }
    96         $data->{$filepath}->{'original_splits'} = $original_splits;
    97         $filepath = '';
    98         $record_count++;
    99       }
    100     }
    101   }
    102   close(HLIN);
    103   print ' - updated ' . $record_count . " records.\n";
    104 }
    105 else
    106 {
    107  die('Error! Failed to open file for reading: ' . $hadoop_log_path);
    108 }
    109 
    110 # 3. Write out the updated data locality file including information about
    111 #    original splits (should we have found that information) and corrected
    112 #    data locality flag.
    113 print " * Writing out updated file: data_locality.csv\n";
    114 if (open(DLOUT, '>:utf8', $data_locality_csv_path))
    115 {
    116   my $record_count = 0;
    117   print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
    118   foreach my $filepath (keys %{$data})
    119   {
    120     my $record = $data->{$filepath};
    121     my $compute_node = $record->{'node'};
    122     my @dl_nodes = split(',', $record->{'original_splits'});
    123     my $data_local = 0;
    124     foreach my $dl_node (@dl_nodes)
    125     {
    126       if ($dl_node eq $compute_node)
    127       {
    128         $data_local = 1;
    129       }
    130     }
    131     print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
    132     $record_count++;
    133   }
    134   close(DLOUT);
    135   print ' - wrote ' . $record_count . " records\n";
    136 }
    137 else
    138 {
    139   die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
    140 }
    141 
    142 # 4. Complete
     50# Complete!
    14351print "\n========================= Complete =========================\n\n";
    14452exit;
    14553
     54################################################################################
     55#################################  Functions  ##################################
     56################################################################################
     57
     58
     59## @function searchForDataLocalityFiles()
     60#
     61sub searchForDataLocalityFiles
     62{
     63  my ($dir) = @_;
     64  # All of the following is *only* valid for directories
     65  if (-d $dir)
     66  {
     67    print ' * Searching for Data Locality information: ' . $dir . "\n";
     68    # Does this directory contain data_locality.csv?
     69    my $dl_csv_path = $dir . '/data_locality.csv';
     70    if (-f $dl_csv_path)
     71    {
     72      &updateDataLocalityForDirectory($dir);
     73    }
     74    # Also descend into child directories
     75    if (opendir(DIR, $dir))
     76    {
     77      my @files = readdir(DIR);
     78      closedir(DIR);
     79      foreach my $file (@files)
     80      {
     81        my $path = $dir . '/' . $file;
     82        if ($file =~ /^\./)
     83        {
     84        }
     85        elsif (-d $path)
     86        {
     87          &searchForDataLocalityFiles($path);
     88        }
     89      }
     90    }
     91    else
     92    {
     93      die('Error! Failed to open directory for reading: ' . $dir);
     94    }
     95  }
     96}
     97## searchForDataLocalityFiles() ##
     98
     99
     100sub updateDataLocalityForDirectory
     101{
     102  my ($dir_prefix) = @_;
     103  # 0. Init
     104  my $data = {};
     105  my $data_locality_csv_path = $dir_prefix . '/data_locality.csv';
     106  my $hadoop_log_path = $dir_prefix . '/hadoop.log';
     107
     108  # 1. Read in current data locality comma separated file
     109  print "   * Read current file: data_locality.csv\n";
     110  if (open(DLIN, '<:utf8', $data_locality_csv_path))
     111  {
     112    my $record_count = 0;
     113    while (my $line = <DLIN>)
     114    {
     115      &printDebug($line);
     116      # If the line already indicates this csv has been updated (by the
     117      # presence of an "OriginalSplits" column) we don't need to do anything
     118      # else
     119      if ($line =~ /OriginalSplits/)
     120      {
     121        print "   ! File already updated...\n";
     122        close(DLIN);
     123        return;
     124      }
     125      if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/)
     126      {
     127        my $taskid       = $1;
     128        my $attempt_no   = $2;
     129        my $compute_node = $3;
     130        my $splits       = $4;
     131
     132        # Use the taskid to look up the manifest file and therein the filepath
     133        my $filepath = '';
     134        my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml';
     135        &printDebug('searching for manifest file: ' . $manifest_path);
     136        if (open(MIN, '<:utf8', $manifest_path))
     137        {
     138          while (my $line2 = <MIN>)
     139          {
     140            if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/)
     141            {
     142              my $protocol = $1;
     143              $filepath = 'hdfs:' . $2;
     144            }
     145          }
     146          close(MIN);
     147        }
     148        else
     149        {
     150          die('Failed to open manifest for reading: ' . $manifest_path);
     151        }
     152        &printDebug('filepath: ' . $filepath);
     153        $data->{$filepath} = {'taskid' => $taskid,
     154                              'attempt' => $attempt_no,
     155                              'node' => $compute_node,
     156                              'splits' => $splits,
     157                              'original_splits' => ''};
     158        $record_count++;
     159      }
     160    }
     161    close(DLIN);
     162    print '   - read ' . $record_count . " records.\n";
     163  }
     164  else
     165  {
     166    die('Error! Failed to open file for reading: ' . $data_locality_csv_path);
     167  }
     168
     169  # 2. Read in hadoop log looking for split mapping information
     170  print "   * Read split information: hadoop.log\n";
     171  if (open(HLIN, '<:utf8', $hadoop_log_path))
     172  {
     173    my $record_count = 0;
     174    my $filepath = '';
     175    while (my $line = <HLIN>)
     176    {
     177      if ($line =~ /^Filepath:\s(.+)$/)
     178      {
     179        $filepath = $1;
     180      }
     181      elsif ($filepath ne '')
     182      {
     183        # this is the information that was unintentionally lost
     184        if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/)
     185        {
     186          my $original_splits = &parseSplits($1);
     187          if (!defined $data->{$filepath})
     188          {
     189            die('Found split information for file not in data locality log: ' . $filepath);
     190          }
     191          $data->{$filepath}->{'original_splits'} = $original_splits;
     192          $filepath = '';
     193          $record_count++;
     194        }
     195      }
     196    }
     197    close(HLIN);
     198    print '   - updated ' . $record_count . " records.\n";
     199  }
     200  else
     201  {
     202    die('Error! Failed to open file for reading: ' . $hadoop_log_path);
     203  }
     204
     205  # 3. Write out the updated data locality file including information about
     206  #    original splits (should we have found that information) and corrected
     207  #    data locality flag.
     208  # backup old
     209  my $backup_path = $data_locality_csv_path . '.bak';
     210  if (-f $backup_path)
     211  {
     212    unlink($backup_path);
     213  }
     214  rename($data_locality_csv_path, $backup_path);
     215  # write new
     216  print "   * Writing out updated file: data_locality.csv\n";
     217  if (open(DLOUT, '>:utf8', $data_locality_csv_path))
     218  {
     219    my $record_count = 0;
     220    print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n";
     221    foreach my $filepath (keys %{$data})
     222    {
     223      my $record = $data->{$filepath};
     224      my $compute_node = $record->{'node'};
     225      my @dl_nodes = split(',', $record->{'original_splits'});
     226      my $data_local = 0;
     227      foreach my $dl_node (@dl_nodes)
     228      {
     229        if ($dl_node eq $compute_node)
     230        {
     231          $data_local = 1;
     232        }
     233      }
     234      print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n";
     235      $record_count++;
     236    }
     237    close(DLOUT);
     238    print '   - wrote ' . $record_count . " records\n";
     239  }
     240  else
     241  {
     242    die('Error! Failed to open file for writing: ' . $data_locality_csv_path);
     243  }
     244}
     245## updateDataLocalityForDirectory() ##
     246
     247
     248## @function parseSplits()
     249#
    146250sub parseSplits
    147251{
     
    154258{
    155259  my ($msg) = @_;
    156   print '[DEBUG] ' . $msg . "\n";
     260  if ($debug)
     261  {
     262    chomp($msg);
     263    print '[DEBUG] ' . $msg . "\n";
     264  }
    157265}
    158266
Note: See TracChangeset for help on using the changeset viewer.