Ignore:
Timestamp:
2013-06-05T13:02:06+12:00 (11 years ago)
Author:
jmt12
Message:

Adding the ability for the Hadoop Mapper to determine what CPU number it is running on (at that moment - best I can do) - this is useful for multicore machines as otherwise there is no way to tell the 'worker processes' apart

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest.java

    r27494 r27546  
    169169      String task_id = conf.get("mapred.task.id");
    170170      task_id = task_id.substring(8); // remove "attempt_" prefix
     171      // Programatically rewrite the protocol as appropriate for the given
     172      // archives directory
     173      file_path = file_path.replace("hdfs://", hdfs_prefix);
    171174      // - create a temporary directory
    172175      File greenstone_tmp_dir = new File("/tmp/greenstone");
     
    180183      FileWriter fw1 = new FileWriter(import_process_log, true);
    181184      long start_time = System.currentTimeMillis()/1000;
    182       fw1.write("[Started:" + start_time + "]\n");
    183       fw1.write("[Host:" + InetAddress.getLocalHost().getHostName() + "]\n");
    184       fw1.write("[Task:" + task_id + "]\n");
    185 
    186       // Programatically rewrite the protocol as appropriate for the given
    187       // archives directory
    188       file_path = file_path.replace("hdfs://", hdfs_prefix);
    189       fw1.write("[Map:" + file_path + " => " + value + "]\n");
     185      StringBuffer header_block = new StringBuffer("[Started:");
     186      header_block.append(start_time);
     187      header_block.append("]\n[Host:");
     188      header_block.append(InetAddress.getLocalHost().getHostName());
     189      header_block.append("]\n[CPU:");
     190      header_block.append(runCommand("getcpu"));
     191      header_block.append("]\n[Task:");
     192      header_block.append(task_id);
     193      header_block.append("]\n[Map:");
     194      header_block.append(file_path);
     195      header_block.append("=>");
     196      header_block.append(value);
     197      header_block.append("]\n");
     198      fw1.write(header_block.toString());
     199      header_block = null;
    190200
    191201      // - create a temporary manifest file to process this file. Overwrite any
     
    338348  }
    339349  /** main(String[]) **/
     350
     351  /** @function runCommand()
     352   *
     353   * A convenience method that calls an external command and returns its
     354   * standard out as a string. Warning! Not safe if the command could return a
     355   * large amount of text in the STDERR stream - may infinitely block.
     356   *
     357   */
     358  public static String runCommand(String command)
     359  {
     360    StringBuffer result = new StringBuffer();
     361    try
     362    {
     363      Runtime run = Runtime.getRuntime() ;
     364      Process pr = run.exec(command) ;
     365      pr.waitFor() ;
     366      BufferedReader buf = new BufferedReader( new InputStreamReader( pr.getInputStream() ) ) ;
     367      String line;
     368      while ( ( line = buf.readLine() ) != null )
     369      {
     370        result.append(line);
     371      }
     372    }
     373    catch (Exception ex)
     374    {
     375      System.err.println("Error! " + ex.getMessage());
     376    }
     377    return result.toString();
     378  }
     379  /** runCommand() **/
    340380}
    341381
Note: See TracChangeset for help on using the changeset viewer.