Changeset 28312

Show
Ignore:
Timestamp:
26.09.2013 11:13:14 (6 years ago)
Author:
jmt12
Message:

Working on finer control over data locality - so I can configure a run with no locality, for example

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest2.java

    r28192 r28312  
    1313import java.net.InetAddress; 
    1414import java.nio.channels.FileChannel; 
     15import java.util.Collection; 
    1516import java.util.Iterator; 
     17import java.util.List; 
    1618import java.util.Map; 
     19import java.util.StringTokenizer; 
    1720import java.util.regex.Matcher; 
    1821import java.util.regex.Pattern; 
     
    2124import org.apache.hadoop.conf.*; 
    2225import org.apache.hadoop.io.*; 
     26import org.apache.hadoop.mapred.ClusterStatus; 
     27import org.apache.hadoop.mapred.JobClient; 
     28import org.apache.hadoop.mapred.JobConf; 
    2329import org.apache.hadoop.mapreduce.*; 
    2430import org.apache.hadoop.mapreduce.Mapper.Context; 
     
    127133    extends FileInputFormat<Text, Text> 
    128134  { 
     135 
     136    private String[] getActiveServersList(JobContext context) 
     137    { 
     138      String [] servers = null; 
     139      try 
     140      { 
     141        JobClient jc = new JobClient((JobConf)context.getConfiguration()); 
     142        ClusterStatus status = jc.getClusterStatus(true); 
     143        Collection<String> atc = status.getActiveTrackerNames(); 
     144        servers = new String[atc.size()]; 
     145        int s = 0; 
     146        for (String serverInfo : atc) 
     147        { 
     148          StringTokenizer st = new StringTokenizer(serverInfo, ":"); 
     149          String trackerName = st.nextToken(); 
     150          StringTokenizer st1 = new StringTokenizer(trackerName, "_"); 
     151          st1.nextToken(); 
     152          servers[s++] = st1.nextToken(); 
     153        } 
     154      } 
     155      catch (IOException e) 
     156      { 
     157        e.printStackTrace(); 
     158      } 
     159      System.err.print("Servers: "); 
     160      String sep = ""; 
     161      for (Object obj : servers) 
     162      { 
     163        System.err.print(sep + obj.toString()); 
     164        sep = ", "; 
     165      } 
     166      System.err.println(""); 
     167      return servers; 
     168    } 
     169    /** getActiveServersList() **/ 
     170 
     171    /** 
     172     */ 
     173    public List<InputSplit> getSplits(JobContext job) 
     174      throws IOException 
     175    { 
     176      System.err.println("GSFileInputFormat::getSplits()"); 
     177      // get splits 
     178      List<InputSplit> original_splits = super.getSplits(job); 
     179      // Get active servers 
     180      String[] servers = getActiveServersList(job); 
     181      if(servers == null) 
     182      { 
     183        return null; 
     184      } 
     185      // done 
     186      System.err.println("Splits: "); 
     187      for (InputSplit obj : original_splits) 
     188      { 
     189        System.err.println(obj.toString()); 
     190      } 
     191      return original_splits; 
     192    } 
     193    /** getSplits() **/ 
     194 
    129195    /** 
    130196     *  Don't split the files