Changeset 27644

Show
Ignore:
Timestamp:
18.06.2013 10:31:34 (6 years ago)
Author:
jmt12
Message:

Extended to support HDFS-access via NFS. This applies to both the call to Hadoop (which needed one extra argument stating the NFS located archives dir) but also to the import directory setup stuff (much quicker to write to HDFS via NFS rather than Java application calls)

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl

    r27594 r27644  
    66BEGIN 
    77{ 
    8   die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'}; 
     8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne ''); 
    99  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'}; 
    1010  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'}; 
     
    1919my $collection = 'test'; 
    2020my $use_thrift = 1; 
    21 my $start_thrift = 1; 
     21my $start_thrift = 0; 
    2222my $debug = 0; 
    2323my $dry_run = 0; 
     24my $flush_diskcache = 0; 
     25my $use_nfs = 0; 
     26 
    2427my $gsdl_home = $ENV{'GSDLHOME'}; 
    2528my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'}; 
     
    3538 
    3639# 1. Read and validate parameters 
    37 if (defined $ARGV[0]) 
     40if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i) 
    3841{ 
    3942  $collection = $ARGV[0]; 
     
    4144else 
    4245{ 
    43   print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] \n\n"; 
     46  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs]\n\n"; 
     47  exit; 
    4448} 
    4549my $offset = 1; 
     
    6266    $refresh_import = 1; 
    6367  } 
     68  if ($ARGV[$offset] eq '-flush_diskcache') 
     69  { 
     70    $flush_diskcache = 1; 
     71  } 
     72  if ($ARGV[$offset] eq '-start_thrift') 
     73  { 
     74    $start_thrift = 1; 
     75  } 
     76  if ($ARGV[$offset] eq '-use_nfs') 
     77  { 
     78    $use_nfs = 1; 
     79  } 
    6480  $offset++; 
    6581} 
     
    6884{ 
    6985  $hdfs_fs_prefix = 'HDFSShell://'; 
     86} 
     87if ($use_nfs) 
     88{ 
     89  $hdfs_fs_prefix = '/hdfs'; 
    7090} 
    7191 
     
    87107} 
    88108# - directories within HDFS 
    89 #my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import'); 
    90 my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import'); 
    91 #my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives'); 
    92 my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives'); 
     109my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import'); 
     110print "HDFS Import Directory: " . $hdfs_input_dir . "\n"; 
     111my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import'); 
     112if ($use_nfs) 
     113{ 
     114  print "=> NFS Import Directory: " . $nfs_input_dir . "\n"; 
     115} 
     116my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives'); 
     117print "HDFS Archives Directory: " . $hdfs_output_dir . "\n"; 
     118my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives'); 
     119if ($use_nfs) 
     120{ 
     121  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n"; 
     122} 
    93123 
    94124# 2. Copy the import directory into HDFS 
    95125print " * Replicating import directory in HDFS..."; 
    96126# - check if import directory already exists 
    97 my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir); 
     127my $hdfs_import_exists = 0; 
     128if ($use_nfs) 
     129{ 
     130  if (-d $nfs_input_dir) 
     131  { 
     132    $hdfs_import_exists = 1; 
     133  } 
     134} 
     135else 
     136{ 
     137  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir); 
     138} 
    98139if ($refresh_import || !$hdfs_import_exists) 
    99140{ 
     
    101142  if ($hdfs_import_exists) 
    102143  { 
    103     &hdfsCommand('rmr', $hdfs_input_dir); 
     144    if ($use_nfs) 
     145    { 
     146      &recursiveDelete($nfs_input_dir, '/hdfs'); 
     147    } 
     148    else 
     149    { 
     150      &hdfsCommand('rmr', $hdfs_input_dir); 
     151    } 
    104152  } 
    105153  # - now recursively copy the contents of import directory into HDFS ensuring 
    106154  #   that relative paths are maintained 
    107   my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir); 
     155  my $file_count = 0; 
     156  if ($use_nfs) 
     157  { 
     158    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir); 
     159  } 
     160  else 
     161  { 
     162    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir); 
     163  } 
    108164  &debugPrint($file_count . " files 'putted'"); 
    109165  print "Done!\n"; 
     
    113169  print "Already exists!\n"; 
    114170} 
     171 
    115172# - clear out the archives regardless 
    116173my $gs_archives_dir = $gs_collection_dir . '/archives'; 
     
    119176{ 
    120177  print " * Clearing existing archives directory for this collection... "; 
    121   &shellCommand('rm -rf "' . $gs_archives_dir . '"'); 
     178  &recursiveDelete($gs_archives_dir, $gsdl_home); 
    122179  $deleted_archives = 1; 
    123180} 
    124181mkdir($gs_archives_dir, 0755); 
    125 if (&hdfsTest('d', 0, $hdfs_output_dir)) 
     182my $hdfs_archives_exists = 0; 
     183if ($use_nfs) 
     184{ 
     185  if (-d $nfs_output_dir) 
     186  { 
     187    $hdfs_archives_exists = 1; 
     188  } 
     189} 
     190else 
     191{ 
     192  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir) 
     193} 
     194if ($hdfs_archives_exists) 
    126195{ 
    127196  if (!$deleted_archives) 
     
    129198    print " * Clearing existing archives directory for this collection... "; 
    130199  } 
    131   &hdfsCommand('rmr', $hdfs_output_dir); 
     200  if ($use_nfs) 
     201  { 
     202    &recursiveDelete($nfs_output_dir, '/hdfs'); 
     203  } 
     204  else 
     205  { 
     206    &hdfsCommand('rmr', $hdfs_output_dir); 
     207  } 
    132208  $deleted_archives = 1; 
    133209} 
     
    136212  print "Done!\n"; 
    137213} 
     214 
    138215# - watch for cached directories for Media based collections 
    139216my $gs_cached_dir = $gs_collection_dir . '/cached'; 
     
    141218{ 
    142219  print " * Clearing existing cached media directory for this collection... "; 
    143   &shellCommand('rm -rf "' . $gs_cached_dir . '"'); 
     220  &recursiveDelete($gs_cached_dir, $gsdl_home); 
    144221  print "Done!\n"; 
    145222} 
     
    150227if (!&dirIsEmpty($gs_logs_dir)) 
    151228{ 
    152   &shellCommand('rm -f ' . $gs_logs_dir . '/*.*'); 
     229  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home); 
    153230} 
    154231if (!&dirIsEmpty('/tmp/greenstone')) 
     
    156233  &shellCommand('rm -f /tmp/greenstone/*.*'); 
    157234  &shellCommand('rm -rf /tmp/gsimport*'); 
     235  &shellCommand('rm -rf /tmp/thrift'); 
    158236} 
    159237if ($is_rocks_cluster) 
     
    161239  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"'); 
    162240  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"'); 
     241  &shellCommand('rocks run host "rm -rf /tmp/thrift"'); 
    163242} 
    164243print "Done!\n"; 
    165244 
    166245# - flush DNS cache too, so we are playing on a level field 
    167 print " * Flushing disk cache... "; 
    168 &shellCommand('flush_caches.pl'); 
    169 if ($is_rocks_cluster) 
    170 { 
    171   &shellCommand('rocks run host "flush_caches.pl"'); 
    172 } 
    173 print "Done!\n"; 
     246if ($flush_diskcache) 
     247{ 
     248  print " * Flushing disk cache... "; 
     249  &shellCommand('flush_caches.pl'); 
     250  if ($is_rocks_cluster) 
     251  { 
     252    &shellCommand('rocks run host "flush_caches.pl"'); 
     253  } 
     254  print "Done!\n"; 
     255} 
    174256 
    175257# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer) 
     
    218300  { 
    219301    print " * Starting Thrift Servers (on compute nodes)... "; 
    220     &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"'); 
     302    print "[DEBUG]\n" . &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"') . "\n\n"; 
    221303  } 
    222304  # single server 
     
    227309  } 
    228310  print "Done!\n"; 
     311} 
     312 
     313my $actual_archives_dir; 
     314if ($use_nfs) 
     315{ 
     316  $actual_archives_dir = $nfs_output_dir; 
     317} 
     318else 
     319{ 
     320  $actual_archives_dir = $hdfs_output_dir; 
     321  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/; 
    229322} 
    230323 
     
    236329print " * Running import using Hadoop..."; 
    237330my $hadoop_log = $gs_results_dir . '/hadoop.log'; 
    238 &shellCommand('echo "host:$HDFSHOST" > ' . $hadoop_log); 
    239 my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' .  $hdfs_input_dir . '" "' . $hdfs_output_dir . '" >> ' . $hadoop_log . ' 2>&1'; 
     331&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log); 
     332my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest '; 
     333$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir 
     334$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir 
     335$hadoop_command .= $collection . ' '; # The collection name 
     336$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir 
     337$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver) 
     338$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in 
     339$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out 
     340$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log 
    240341&shellCommand($hadoop_command); 
    241342print "Done!\n"; 
     
    351452# /** printUsage() **/ 
    352453 
    353 # /** 
    354 #  */ 
     454 
     455## @function recursiveCopy() 
     456# 
    355457sub recursiveCopy 
    356458{ 
     
    358460  my $file_count = 0; 
    359461  # - create the directory in HDFS 
    360   &hdfsCommand('mkdir', $hdfs_dir); 
     462  if ($use_nfs) 
     463  { 
     464    &shellCommand('mkdir "' . $hdfs_dir . '"'); 
     465  } 
     466  else 
     467  { 
     468    &hdfsCommand('mkdir', $hdfs_dir); 
     469  } 
    361470  # - search $src_dir for files 
    362471  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir); 
     
    379488      { 
    380489        my $hdfs_path = $hdfs_dir . '/' . $file; 
    381         &hdfsCommand('put', $src_path, $hdfs_path); 
     490        if ($use_nfs) 
     491        { 
     492          &shellCommand('cp "' . $src_path . '" "' . $hdfs_path . '"'); 
     493        } 
     494        else 
     495        { 
     496          &hdfsCommand('put', $src_path, $hdfs_path); 
     497        } 
    382498        $file_count++; 
    383499      } 
     
    386502  return $file_count; 
    387503} 
    388 # /** recursiveCopy() **/ 
     504## recursiveCopy() ## 
     505 
    389506 
    390507# /** @function shellCommand 
     
    427544} 
    428545# /** dirIsEmpty() **/ 
     546 
     547 
     548## @function recursiveDelete() 
     549# 
     550sub recursiveDelete 
     551{ 
     552  my ($dir, $prefix) = @_; 
     553  if ($dir =~ /^$prefix/) 
     554  { 
     555    &shellCommand('rm -rf "' . $dir . '"'); 
     556  } 
     557} 
     558## recursiveDelete() ##