Ignore:
Timestamp:
2013-06-18T10:59:52+12:00 (11 years ago)
Author:
jmt12
Message:

Add the ability to stagger the starting of Mappers by placing a 'delay.me' file in the tmp directory of the compute node to delay. They will then be initially delayed by compute node number * 100 seconds

Location:
gs2-extensions/parallel-building/trunk/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl

    r27644 r27654  
    2222my $debug = 0;
    2323my $dry_run = 0;
     24my $stagger = 0;
    2425my $flush_diskcache = 0;
    2526my $use_nfs = 0;
     
    4445else
    4546{
    46   print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs]\n\n";
     47  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
     48  print STDERR "where: [debug] print more debug messages to STDERR\n";
     49  print STDERR "       [dry_run] don't actually perform an file actions\n";
    4750  exit;
    4851}
     
    6568  {
    6669    $refresh_import = 1;
     70  }
     71  if ($ARGV[$offset] eq '-stagger')
     72  {
     73    $stagger = 1;
    6774  }
    6875  if ($ARGV[$offset] eq '-flush_diskcache')
     
    255262}
    256263
     264# - If we've been asked to Stagger start-up, add "delay.me" files to the
     265#   compute nodes
     266if ($is_rocks_cluster && $stagger)
     267{
     268  &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
     269}
     270
    257271# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
    258272#    where we start the server now to ensure it lives on the head node
     
    295309
    296310# 3.5 Start up the thrift server(s) if we've been asked to
     311my $thrift_log = $gs_results_dir . '/thriftctl.log';
    297312if ($start_thrift)
    298313{
     
    300315  {
    301316    print " * Starting Thrift Servers (on compute nodes)... ";
    302     print "[DEBUG]\n" . &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"') . "\n\n";
     317    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
    303318  }
    304319  # single server
     
    306321  {
    307322    print " * Starting Thrift Server... ";
    308     &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
     323    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
    309324  }
    310325  print "Done!\n";
     
    362377  {
    363378    print " * Stopping Thrift Servers (on compute nodes)... ";
    364     &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop"');
     379    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
    365380  }
    366381  # single server
     
    368383  {
    369384    print " * Stoping Thrift Server... ";
    370     &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
     385    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
    371386  }
    372387  print "Done!\n";
     
    384399  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
    385400}
     401if ($start_thrift && -d '/tmp/thrift')
     402{
     403  &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
     404}
    386405# - remote files
    387406if ($is_rocks_cluster)
    388407{
    389408  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
    390 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
     409  &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
     410  if ($start_thrift)
     411  {
     412    &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
     413  }
    391414}
    392415print "Done!\n";
    393 # - generate data locality report
     416
     417# - generate data locality report...
    394418&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
     419
     420# - hadoop report...
     421&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
     422
     423# - and gantt chart
     424&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
    395425
    396426# 7. Done - clean up
  • gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest.java

    r27641 r27654  
    1616import java.net.InetAddress;
    1717import java.util.Map;
     18import java.util.regex.Matcher;
     19import java.util.regex.Pattern;
    1820
    1921import org.apache.hadoop.fs.Path;
     
    190192      // - open a unique log file
    191193      File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
     194      FileWriter fw1 = new FileWriter(import_process_log, true);
     195      // MEDUSA Customization: Introduce a slight delay based upon the hostname
     196      // in order to stagger the startup of Map workers. It looks like the avg
     197      // IO is around 25 minutes... so lets try to make it so the last mapper
     198      // starts up 25 minutes after the first (with all others spread in
     199      // between).
     200      String hostname = InetAddress.getLocalHost().getHostName();
     201      // We only do this if there is a sentinel file lurking in tmp
     202      try
     203      {
     204        File delay_file = new File("/tmp/greenstone/delay.me");
     205        if (delay_file.exists())
     206        {
     207          Pattern p = Pattern.compile("compute-0-([0-9]+).local");
     208          Matcher m = p.matcher(hostname);
     209          if (m.matches())
     210          {
     211            String node_str = m.group(1);
     212            int node_number = Integer.parseInt(node_str) * 100;
     213            fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
     214            Thread.currentThread().sleep(1000 * node_number);
     215          }
     216          // We only do this once for each compute node
     217          delay_file.delete();
     218        }
     219      }
     220      catch (Exception ie)
     221      {
     222        System.err.println(ie.toString());
     223      }
     224
    192225      // - start the log by writing the time and the manifest line
    193       FileWriter fw1 = new FileWriter(import_process_log, true);
    194226      long start_time = System.currentTimeMillis()/1000;
    195227      StringBuffer header_block = new StringBuffer("[Started:");
    196228      header_block.append(start_time);
    197229      header_block.append("]\n[Host:");
    198       header_block.append(InetAddress.getLocalHost().getHostName());
     230      header_block.append(hostname);
    199231      header_block.append("]\n[CPU:");
    200232      String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
Note: See TracChangeset for help on using the changeset viewer.