Changeset 28647

Show
Ignore:
Timestamp:
20.11.2013 12:49:26 (6 years ago)
Author:
jmt12
Message:

Adding progress messages and making a debug message optional

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/bin/script/update_data_locality.pl

    r28357 r28647  
    11#!/usr/bin/perl 
    22 
     3use Cwd; 
    34use Sort::Key::Natural qw(natsort); 
    45 
     
    1213print "============================================================\n\n"; 
    1314 
    14 if (!defined $ARGV[0] || !-d $ARGV[0]) 
     15our $debug = 0; 
     16 
     17# Loop through all the arguments, descending into directories to process data 
     18# locality files 
     19my $offset = 0; 
     20my $found_dirs = 0; 
     21while (defined $ARGV[$offset]) 
     22{ 
     23  my $argument = $ARGV[$offset]; 
     24  # handle -option arguments? 
     25  if ($argument =~ /^-(.+)$/) 
     26  { 
     27    my $option = $1; 
     28    if ($option eq 'debug') 
     29    { 
     30      $debug = 1; 
     31    } 
     32  } 
     33  # handle directories 
     34  else 
     35  { 
     36    my $path = getcwd() . '/' . $argument; 
     37    if (-d $path) 
     38    { 
     39      $found_dirs++; 
     40      &searchForDataLocalityFiles($path); 
     41    } 
     42  } 
     43  $offset++; 
     44} 
     45if ($found_dirs == 0) 
    1546{ 
    1647  &printUsage('Missing results directory'); 
    1748} 
    1849 
    19 # 0. Init 
    20 my $data = {}; 
    21 my $data_locality_csv_path = $ARGV[0] . '/data_locality.csv'; 
    22 my $hadoop_log_path = $ARGV[0] . '/hadoop.log'; 
    23  
    24 # 1. Read in current data locality comma separated file 
    25 print " * Read current file: data_locality.csv\n"; 
    26 if (open(DLIN, '<:utf8', $data_locality_csv_path)) 
    27 { 
    28   my $record_count = 0; 
    29   while (my $line = <DLIN>) 
    30   { 
    31     if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/) 
    32     { 
    33       my $taskid       = $1; 
    34       my $attempt_no   = $2; 
    35       my $compute_node = $3; 
    36       my $splits       = $4; 
    37  
    38       # Use the taskid to look up the manifest file and therein the filepath 
    39       my $filepath = ''; 
    40       my $manifest_path = $ARGV[0] . '/manifest' . $taskid . '_' . $attempt_no . '.xml'; 
    41       if (open(MIN, '<:utf8', $manifest_path)) 
    42       { 
    43         while (my $line2 = <MIN>) 
    44         { 
    45           if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/) 
    46           { 
    47             my $protocol = $1; 
    48             $filepath = 'hdfs:' . $2; 
    49           } 
    50         } 
    51         close(MIN); 
    52       } 
    53       else 
    54       { 
    55         die('Failed to open manifest for reading: ' . $manifest_path); 
    56       } 
    57  
    58       $data->{$filepath} = {'taskid' => $taskid, 
    59                             'attempt' => $attempt_no, 
    60                             'node' => $compute_node, 
    61                             'splits' => $splits, 
    62                             'original_splits' => ''}; 
    63       $record_count++; 
    64     } 
    65   } 
    66   close(DLIN); 
    67   print ' - read ' . $record_count . " records.\n"; 
    68 } 
    69 else 
    70 { 
    71  die('Error! Failed to open file for reading: ' . $data_locality_csv_path); 
    72 } 
    73  
    74 # 2. Read in hadoop log looking for split mapping information 
    75 print " * Read split information: hadoop.log\n"; 
    76 if (open(HLIN, '<:utf8', $hadoop_log_path)) 
    77 { 
    78   my $record_count = 0; 
    79   my $filepath = ''; 
    80   while (my $line = <HLIN>) 
    81   { 
    82     if ($line =~ /^Filepath:\s(.+)$/) 
    83     { 
    84       $filepath = $1; 
    85     } 
    86     elsif ($filepath ne '') 
    87     { 
    88       # this is the information that was unintentionally lost 
    89       if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/) 
    90       { 
    91         my $original_splits = &parseSplits($1); 
    92         if (!defined $data->{$filepath}) 
    93         { 
    94           die('Found split information for file not in data locality log: ' . $filepath); 
    95         } 
    96         $data->{$filepath}->{'original_splits'} = $original_splits; 
    97         $filepath = ''; 
    98         $record_count++; 
    99       } 
    100     } 
    101   } 
    102   close(HLIN); 
    103   print ' - updated ' . $record_count . " records.\n"; 
    104 } 
    105 else 
    106 { 
    107  die('Error! Failed to open file for reading: ' . $hadoop_log_path); 
    108 } 
    109  
    110 # 3. Write out the updated data locality file including information about 
    111 #    original splits (should we have found that information) and corrected 
    112 #    data locality flag. 
    113 print " * Writing out updated file: data_locality.csv\n"; 
    114 if (open(DLOUT, '>:utf8', $data_locality_csv_path)) 
    115 { 
    116   my $record_count = 0; 
    117   print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n"; 
    118   foreach my $filepath (keys %{$data}) 
    119   { 
    120     my $record = $data->{$filepath}; 
    121     my $compute_node = $record->{'node'}; 
    122     my @dl_nodes = split(',', $record->{'original_splits'}); 
    123     my $data_local = 0; 
    124     foreach my $dl_node (@dl_nodes) 
    125     { 
    126       if ($dl_node eq $compute_node) 
    127       { 
    128         $data_local = 1; 
    129       } 
    130     } 
    131     print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n"; 
    132     $record_count++; 
    133   } 
    134   close(DLOUT); 
    135   print ' - wrote ' . $record_count . " records\n"; 
    136 } 
    137 else 
    138 { 
    139   die('Error! Failed to open file for writing: ' . $data_locality_csv_path); 
    140 } 
    141  
    142 # 4. Complete 
     50# Complete! 
    14351print "\n========================= Complete =========================\n\n"; 
    14452exit; 
    14553 
     54################################################################################ 
     55#################################  Functions  ################################## 
     56################################################################################ 
     57 
     58 
     59## @function searchForDataLocalityFiles() 
     60# 
     61sub searchForDataLocalityFiles 
     62{ 
     63  my ($dir) = @_; 
     64  # All of the following is *only* valid for directories 
     65  if (-d $dir) 
     66  { 
     67    print ' * Searching for Data Locality information: ' . $dir . "\n"; 
     68    # Does this directory contain data_locality.csv? 
     69    my $dl_csv_path = $dir . '/data_locality.csv'; 
     70    if (-f $dl_csv_path) 
     71    { 
     72      &updateDataLocalityForDirectory($dir); 
     73    } 
     74    # Also descend into child directories 
     75    if (opendir(DIR, $dir)) 
     76    { 
     77      my @files = readdir(DIR); 
     78      closedir(DIR); 
     79      foreach my $file (@files) 
     80      { 
     81        my $path = $dir . '/' . $file; 
     82        if ($file =~ /^\./) 
     83        { 
     84        } 
     85        elsif (-d $path) 
     86        { 
     87          &searchForDataLocalityFiles($path); 
     88        } 
     89      } 
     90    } 
     91    else 
     92    { 
     93      die('Error! Failed to open directory for reading: ' . $dir); 
     94    } 
     95  } 
     96} 
     97## searchForDataLocalityFiles() ## 
     98 
     99 
     100sub updateDataLocalityForDirectory 
     101{ 
     102  my ($dir_prefix) = @_; 
     103  # 0. Init 
     104  my $data = {}; 
     105  my $data_locality_csv_path = $dir_prefix . '/data_locality.csv'; 
     106  my $hadoop_log_path = $dir_prefix . '/hadoop.log'; 
     107 
     108  # 1. Read in current data locality comma separated file 
     109  print "   * Read current file: data_locality.csv\n"; 
     110  if (open(DLIN, '<:utf8', $data_locality_csv_path)) 
     111  { 
     112    my $record_count = 0; 
     113    while (my $line = <DLIN>) 
     114    { 
     115      &printDebug($line); 
     116      # If the line already indicates this csv has been updated (by the 
     117      # presence of an "OriginalSplits" column) we don't need to do anything 
     118      # else 
     119      if ($line =~ /OriginalSplits/) 
     120      { 
     121        print "   ! File already updated...\n"; 
     122        close(DLIN); 
     123        return; 
     124      } 
     125      if ($line =~ /^([^,]+),(\d),\d,"([^"]+)","([^"]+)"$/) 
     126      { 
     127        my $taskid       = $1; 
     128        my $attempt_no   = $2; 
     129        my $compute_node = $3; 
     130        my $splits       = $4; 
     131 
     132        # Use the taskid to look up the manifest file and therein the filepath 
     133        my $filepath = ''; 
     134        my $manifest_path = $dir_prefix . '/manifest' . $taskid . '_' . $attempt_no . '.xml'; 
     135        &printDebug('searching for manifest file: ' . $manifest_path); 
     136        if (open(MIN, '<:utf8', $manifest_path)) 
     137        { 
     138          while (my $line2 = <MIN>) 
     139          { 
     140            if ($line2 =~ /<Filename>(hdfs|HDFSShell|ThriftFS):(.*)<\/Filename>/) 
     141            { 
     142              my $protocol = $1; 
     143              $filepath = 'hdfs:' . $2; 
     144            } 
     145          } 
     146          close(MIN); 
     147        } 
     148        else 
     149        { 
     150          die('Failed to open manifest for reading: ' . $manifest_path); 
     151        } 
     152        &printDebug('filepath: ' . $filepath); 
     153        $data->{$filepath} = {'taskid' => $taskid, 
     154                              'attempt' => $attempt_no, 
     155                              'node' => $compute_node, 
     156                              'splits' => $splits, 
     157                              'original_splits' => ''}; 
     158        $record_count++; 
     159      } 
     160    } 
     161    close(DLIN); 
     162    print '   - read ' . $record_count . " records.\n"; 
     163  } 
     164  else 
     165  { 
     166    die('Error! Failed to open file for reading: ' . $data_locality_csv_path); 
     167  } 
     168 
     169  # 2. Read in hadoop log looking for split mapping information 
     170  print "   * Read split information: hadoop.log\n"; 
     171  if (open(HLIN, '<:utf8', $hadoop_log_path)) 
     172  { 
     173    my $record_count = 0; 
     174    my $filepath = ''; 
     175    while (my $line = <HLIN>) 
     176    { 
     177      if ($line =~ /^Filepath:\s(.+)$/) 
     178      { 
     179        $filepath = $1; 
     180      } 
     181      elsif ($filepath ne '') 
     182      { 
     183        # this is the information that was unintentionally lost 
     184        if ($line =~ /^\s-\soriginal\ssplit\s\[(.+)\]$/) 
     185        { 
     186          my $original_splits = &parseSplits($1); 
     187          if (!defined $data->{$filepath}) 
     188          { 
     189            die('Found split information for file not in data locality log: ' . $filepath); 
     190          } 
     191          $data->{$filepath}->{'original_splits'} = $original_splits; 
     192          $filepath = ''; 
     193          $record_count++; 
     194        } 
     195      } 
     196    } 
     197    close(HLIN); 
     198    print '   - updated ' . $record_count . " records.\n"; 
     199  } 
     200  else 
     201  { 
     202    die('Error! Failed to open file for reading: ' . $hadoop_log_path); 
     203  } 
     204 
     205  # 3. Write out the updated data locality file including information about 
     206  #    original splits (should we have found that information) and corrected 
     207  #    data locality flag. 
     208  # backup old 
     209  my $backup_path = $data_locality_csv_path . '.bak'; 
     210  if (-f $backup_path) 
     211  { 
     212    unlink($backup_path); 
     213  } 
     214  rename($data_locality_csv_path, $backup_path); 
     215  # write new 
     216  print "   * Writing out updated file: data_locality.csv\n"; 
     217  if (open(DLOUT, '>:utf8', $data_locality_csv_path)) 
     218  { 
     219    my $record_count = 0; 
     220    print DLOUT "TaskID,AttemptNo,Data Local,Compute Node,Splits,OriginalSplits\n"; 
     221    foreach my $filepath (keys %{$data}) 
     222    { 
     223      my $record = $data->{$filepath}; 
     224      my $compute_node = $record->{'node'}; 
     225      my @dl_nodes = split(',', $record->{'original_splits'}); 
     226      my $data_local = 0; 
     227      foreach my $dl_node (@dl_nodes) 
     228      { 
     229        if ($dl_node eq $compute_node) 
     230        { 
     231          $data_local = 1; 
     232        } 
     233      } 
     234      print DLOUT $record->{'taskid'} . ',' . $record->{'attempt'} . ',' . $data_local . ',"' . $compute_node . '","' . $record->{'splits'} . '","' . $record->{'original_splits'} . '"' . "\n"; 
     235      $record_count++; 
     236    } 
     237    close(DLOUT); 
     238    print '   - wrote ' . $record_count . " records\n"; 
     239  } 
     240  else 
     241  { 
     242    die('Error! Failed to open file for writing: ' . $data_locality_csv_path); 
     243  } 
     244} 
     245## updateDataLocalityForDirectory() ## 
     246 
     247 
     248## @function parseSplits() 
     249# 
    146250sub parseSplits 
    147251{ 
     
    154258{ 
    155259  my ($msg) = @_; 
    156   print '[DEBUG] ' . $msg . "\n"; 
     260  if ($debug) 
     261  { 
     262    chomp($msg); 
     263    print '[DEBUG] ' . $msg . "\n"; 
     264  } 
    157265} 
    158266