Changeset 27644 for gs2-extensions


Ignore:
Timestamp:
2013-06-18T10:31:34+12:00 (11 years ago)
Author:
jmt12
Message:

Extended to support HDFS-access via NFS. This applies to both the call to Hadoop (which needed one extra argument stating the NFS located archives dir) but also to the import directory setup stuff (much quicker to write to HDFS via NFS rather than Java application calls)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl

    r27594 r27644  
    66BEGIN
    77{
    8   die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
     8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
    99  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
    1010  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
     
    1919my $collection = 'test';
    2020my $use_thrift = 1;
    21 my $start_thrift = 1;
     21my $start_thrift = 0;
    2222my $debug = 0;
    2323my $dry_run = 0;
     24my $flush_diskcache = 0;
     25my $use_nfs = 0;
     26
    2427my $gsdl_home = $ENV{'GSDLHOME'};
    2528my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
     
    3538
    3639# 1. Read and validate parameters
    37 if (defined $ARGV[0])
     40if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
    3841{
    3942  $collection = $ARGV[0];
     
    4144else
    4245{
    43   print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] \n\n";
     46  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs]\n\n";
     47  exit;
    4448}
    4549my $offset = 1;
     
    6266    $refresh_import = 1;
    6367  }
     68  if ($ARGV[$offset] eq '-flush_diskcache')
     69  {
     70    $flush_diskcache = 1;
     71  }
     72  if ($ARGV[$offset] eq '-start_thrift')
     73  {
     74    $start_thrift = 1;
     75  }
     76  if ($ARGV[$offset] eq '-use_nfs')
     77  {
     78    $use_nfs = 1;
     79  }
    6480  $offset++;
    6581}
     
    6884{
    6985  $hdfs_fs_prefix = 'HDFSShell://';
     86}
     87if ($use_nfs)
     88{
     89  $hdfs_fs_prefix = '/hdfs';
    7090}
    7191
     
    87107}
    88108# - directories within HDFS
    89 #my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
    90 my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
    91 #my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
    92 my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
     109my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
     110print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
     111my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
     112if ($use_nfs)
     113{
     114  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
     115}
     116my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
     117print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
     118my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
     119if ($use_nfs)
     120{
     121  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
     122}
    93123
    94124# 2. Copy the import directory into HDFS
    95125print " * Replicating import directory in HDFS...";
    96126# - check if import directory already exists
    97 my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
     127my $hdfs_import_exists = 0;
     128if ($use_nfs)
     129{
     130  if (-d $nfs_input_dir)
     131  {
     132    $hdfs_import_exists = 1;
     133  }
     134}
     135else
     136{
     137  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
     138}
    98139if ($refresh_import || !$hdfs_import_exists)
    99140{
     
    101142  if ($hdfs_import_exists)
    102143  {
    103     &hdfsCommand('rmr', $hdfs_input_dir);
     144    if ($use_nfs)
     145    {
     146      &recursiveDelete($nfs_input_dir, '/hdfs');
     147    }
     148    else
     149    {
     150      &hdfsCommand('rmr', $hdfs_input_dir);
     151    }
    104152  }
    105153  # - now recursively copy the contents of import directory into HDFS ensuring
    106154  #   that relative paths are maintained
    107   my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
     155  my $file_count = 0;
     156  if ($use_nfs)
     157  {
     158    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
     159  }
     160  else
     161  {
     162    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
     163  }
    108164  &debugPrint($file_count . " files 'putted'");
    109165  print "Done!\n";
     
    113169  print "Already exists!\n";
    114170}
     171
    115172# - clear out the archives regardless
    116173my $gs_archives_dir = $gs_collection_dir . '/archives';
     
    119176{
    120177  print " * Clearing existing archives directory for this collection... ";
    121   &shellCommand('rm -rf "' . $gs_archives_dir . '"');
     178  &recursiveDelete($gs_archives_dir, $gsdl_home);
    122179  $deleted_archives = 1;
    123180}
    124181mkdir($gs_archives_dir, 0755);
    125 if (&hdfsTest('d', 0, $hdfs_output_dir))
     182my $hdfs_archives_exists = 0;
     183if ($use_nfs)
     184{
     185  if (-d $nfs_output_dir)
     186  {
     187    $hdfs_archives_exists = 1;
     188  }
     189}
     190else
     191{
     192  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
     193}
     194if ($hdfs_archives_exists)
    126195{
    127196  if (!$deleted_archives)
     
    129198    print " * Clearing existing archives directory for this collection... ";
    130199  }
    131   &hdfsCommand('rmr', $hdfs_output_dir);
     200  if ($use_nfs)
     201  {
     202    &recursiveDelete($nfs_output_dir, '/hdfs');
     203  }
     204  else
     205  {
     206    &hdfsCommand('rmr', $hdfs_output_dir);
     207  }
    132208  $deleted_archives = 1;
    133209}
     
    136212  print "Done!\n";
    137213}
     214
    138215# - watch for cached directories for Media based collections
    139216my $gs_cached_dir = $gs_collection_dir . '/cached';
     
    141218{
    142219  print " * Clearing existing cached media directory for this collection... ";
    143   &shellCommand('rm -rf "' . $gs_cached_dir . '"');
     220  &recursiveDelete($gs_cached_dir, $gsdl_home);
    144221  print "Done!\n";
    145222}
     
    150227if (!&dirIsEmpty($gs_logs_dir))
    151228{
    152   &shellCommand('rm -f ' . $gs_logs_dir . '/*.*');
     229  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
    153230}
    154231if (!&dirIsEmpty('/tmp/greenstone'))
     
    156233  &shellCommand('rm -f /tmp/greenstone/*.*');
    157234  &shellCommand('rm -rf /tmp/gsimport*');
     235  &shellCommand('rm -rf /tmp/thrift');
    158236}
    159237if ($is_rocks_cluster)
     
    161239  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
    162240  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
     241  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
    163242}
    164243print "Done!\n";
    165244
    166245# - flush DNS cache too, so we are playing on a level field
    167 print " * Flushing disk cache... ";
    168 &shellCommand('flush_caches.pl');
    169 if ($is_rocks_cluster)
    170 {
    171   &shellCommand('rocks run host "flush_caches.pl"');
    172 }
    173 print "Done!\n";
     246if ($flush_diskcache)
     247{
     248  print " * Flushing disk cache... ";
     249  &shellCommand('flush_caches.pl');
     250  if ($is_rocks_cluster)
     251  {
     252    &shellCommand('rocks run host "flush_caches.pl"');
     253  }
     254  print "Done!\n";
     255}
    174256
    175257# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
     
    218300  {
    219301    print " * Starting Thrift Servers (on compute nodes)... ";
    220     &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"');
     302    print "[DEBUG]\n" . &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"') . "\n\n";
    221303  }
    222304  # single server
     
    227309  }
    228310  print "Done!\n";
     311}
     312
     313my $actual_archives_dir;
     314if ($use_nfs)
     315{
     316  $actual_archives_dir = $nfs_output_dir;
     317}
     318else
     319{
     320  $actual_archives_dir = $hdfs_output_dir;
     321  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
    229322}
    230323
     
    236329print " * Running import using Hadoop...";
    237330my $hadoop_log = $gs_results_dir . '/hadoop.log';
    238 &shellCommand('echo "host:$HDFSHOST" > ' . $hadoop_log);
    239 my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' .  $hdfs_input_dir . '" "' . $hdfs_output_dir . '" >> ' . $hadoop_log . ' 2>&1';
     331&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
     332my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest ';
     333$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
     334$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
     335$hadoop_command .= $collection . ' '; # The collection name
     336$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
     337$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
     338$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
     339$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
     340$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
    240341&shellCommand($hadoop_command);
    241342print "Done!\n";
     
    351452# /** printUsage() **/
    352453
    353 # /**
    354 #  */
     454
     455## @function recursiveCopy()
     456#
    355457sub recursiveCopy
    356458{
     
    358460  my $file_count = 0;
    359461  # - create the directory in HDFS
    360   &hdfsCommand('mkdir', $hdfs_dir);
     462  if ($use_nfs)
     463  {
     464    &shellCommand('mkdir "' . $hdfs_dir . '"');
     465  }
     466  else
     467  {
     468    &hdfsCommand('mkdir', $hdfs_dir);
     469  }
    361470  # - search $src_dir for files
    362471  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
     
    379488      {
    380489        my $hdfs_path = $hdfs_dir . '/' . $file;
    381         &hdfsCommand('put', $src_path, $hdfs_path);
     490        if ($use_nfs)
     491        {
     492          &shellCommand('cp "' . $src_path . '" "' . $hdfs_path . '"');
     493        }
     494        else
     495        {
     496          &hdfsCommand('put', $src_path, $hdfs_path);
     497        }
    382498        $file_count++;
    383499      }
     
    386502  return $file_count;
    387503}
    388 # /** recursiveCopy() **/
     504## recursiveCopy() ##
     505
    389506
    390507# /** @function shellCommand
     
    427544}
    428545# /** dirIsEmpty() **/
     546
     547
     548## @function recursiveDelete()
     549#
     550sub recursiveDelete
     551{
     552  my ($dir, $prefix) = @_;
     553  if ($dir =~ /^$prefix/)
     554  {
     555    &shellCommand('rm -rf "' . $dir . '"');
     556  }
     557}
     558## recursiveDelete() ##
Note: See TracChangeset for help on using the changeset viewer.