Changeset 28312


Ignore:
Timestamp:
09/26/13 11:13:14 (8 years ago)
Author:
jmt12
Message:

Working on finer control over data locality - so I can configure a run with no locality, for example

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/src/java/org/nzdl/gsdl/HadoopGreenstoneIngest2.java

    r28192 r28312  
    1313import java.net.InetAddress;
    1414import java.nio.channels.FileChannel;
     15import java.util.Collection;
    1516import java.util.Iterator;
     17import java.util.List;
    1618import java.util.Map;
     19import java.util.StringTokenizer;
    1720import java.util.regex.Matcher;
    1821import java.util.regex.Pattern;
     
    2124import org.apache.hadoop.conf.*;
    2225import org.apache.hadoop.io.*;
     26import org.apache.hadoop.mapred.ClusterStatus;
     27import org.apache.hadoop.mapred.JobClient;
     28import org.apache.hadoop.mapred.JobConf;
    2329import org.apache.hadoop.mapreduce.*;
    2430import org.apache.hadoop.mapreduce.Mapper.Context;
     
    127133    extends FileInputFormat<Text, Text>
    128134  {
     135
     136    private String[] getActiveServersList(JobContext context)
     137    {
     138      String [] servers = null;
     139      try
     140      {
     141        JobClient jc = new JobClient((JobConf)context.getConfiguration());
     142        ClusterStatus status = jc.getClusterStatus(true);
     143        Collection<String> atc = status.getActiveTrackerNames();
     144        servers = new String[atc.size()];
     145        int s = 0;
     146        for (String serverInfo : atc)
     147        {
     148          StringTokenizer st = new StringTokenizer(serverInfo, ":");
     149          String trackerName = st.nextToken();
     150          StringTokenizer st1 = new StringTokenizer(trackerName, "_");
     151          st1.nextToken();
     152          servers[s++] = st1.nextToken();
     153        }
     154      }
     155      catch (IOException e)
     156      {
     157        e.printStackTrace();
     158      }
     159      System.err.print("Servers: ");
     160      String sep = "";
     161      for (Object obj : servers)
     162      {
     163        System.err.print(sep + obj.toString());
     164        sep = ", ";
     165      }
     166      System.err.println("");
     167      return servers;
     168    }
     169    /** getActiveServersList() **/
     170
     171    /**
     172     */
     173    public List<InputSplit> getSplits(JobContext job)
     174      throws IOException
     175    {
     176      System.err.println("GSFileInputFormat::getSplits()");
     177      // get splits
     178      List<InputSplit> original_splits = super.getSplits(job);
     179      // Get active servers
     180      String[] servers = getActiveServersList(job);
     181      if(servers == null)
     182      {
     183        return null;
     184      }
     185      // done
     186      System.err.println("Splits: ");
     187      for (InputSplit obj : original_splits)
     188      {
     189        System.err.println(obj.toString());
     190      }
     191      return original_splits;
     192    }
     193    /** getSplits() **/
     194
    129195    /**
    130196     *  Don't split the files
Note: See TracChangeset for help on using the changeset viewer.