Ignore:
Timestamp:
2013-06-18T10:59:52+12:00 (11 years ago)
Author:
jmt12
Message:

Add the ability to stagger the starting of Mappers by placing a 'delay.me' file in the tmp directory of the compute node to delay. They will then be initially delayed by compute node number * 100 seconds

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest.java

    r27641 r27654  
    1616import java.net.InetAddress;
    1717import java.util.Map;
     18import java.util.regex.Matcher;
     19import java.util.regex.Pattern;
    1820
    1921import org.apache.hadoop.fs.Path;
     
    190192      // - open a unique log file
    191193      File import_process_log = new File("/tmp/greenstone/import-hadoop-" + task_id + ".log");
     194      FileWriter fw1 = new FileWriter(import_process_log, true);
     195      // MEDUSA Customization: Introduce a slight delay based upon the hostname
     196      // in order to stagger the startup of Map workers. It looks like the avg
     197      // IO is around 25 minutes... so lets try to make it so the last mapper
     198      // starts up 25 minutes after the first (with all others spread in
     199      // between).
     200      String hostname = InetAddress.getLocalHost().getHostName();
     201      // We only do this if there is a sentinel file lurking in tmp
     202      try
     203      {
     204        File delay_file = new File("/tmp/greenstone/delay.me");
     205        if (delay_file.exists())
     206        {
     207          Pattern p = Pattern.compile("compute-0-([0-9]+).local");
     208          Matcher m = p.matcher(hostname);
     209          if (m.matches())
     210          {
     211            String node_str = m.group(1);
     212            int node_number = Integer.parseInt(node_str) * 100;
     213            fw1.write("[DEBUG] Delaying start for " + node_number + " seconds");
     214            Thread.currentThread().sleep(1000 * node_number);
     215          }
     216          // We only do this once for each compute node
     217          delay_file.delete();
     218        }
     219      }
     220      catch (Exception ie)
     221      {
     222        System.err.println(ie.toString());
     223      }
     224
    192225      // - start the log by writing the time and the manifest line
    193       FileWriter fw1 = new FileWriter(import_process_log, true);
    194226      long start_time = System.currentTimeMillis()/1000;
    195227      StringBuffer header_block = new StringBuffer("[Started:");
    196228      header_block.append(start_time);
    197229      header_block.append("]\n[Host:");
    198       header_block.append(InetAddress.getLocalHost().getHostName());
     230      header_block.append(hostname);
    199231      header_block.append("]\n[CPU:");
    200232      String getcpu_executable_cmd = gsdlhome + "/ext/parallel-building/linux/bin/getcpu";
Note: See TracChangeset for help on using the changeset viewer.