Ignore:
Timestamp:
2016-12-28T14:04:19+13:00 (7 years ago)
Author:
davidb
Message:

Updating of POS code to new files-per-partition paramater, plus some other related tweaks

File:
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForPOSCount.java

    r31264 r31271  
    2727    //   http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
    2828   
    29     protected static final int DEFAULT_NUM_CORES = 6;
    30     protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;
     29    //protected static final int DEFAULT_NUM_CORES = 6;
     30    //protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;
     31    protected static final int DEFAULT_FILES_PER_PARTITION = 3000;
    3132   
    3233    protected String _input_dir;
     
    5354    public void execPOSCount()
    5455    {   
    55         String spark_app_name = generateSparkAppName("Per Page");       
     56        String spark_app_name = generateSparkAppName("Per Volume");     
    5657       
    5758        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     
    6768        }
    6869       
    69         int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
    70         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache();
     70        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
     71        int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION);
     72       
     73       
     74        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
    7175        json_list_data.setName("JSON-file-list");
    7276       
     
    7478        double per_vol = 100.0/(double)num_volumes;
    7579
     80        int num_partitions = (int)(num_volumes/files_per_partition)+1;
     81        JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
     82        json_list_data_rp.setName("JSON-file-list--repartitioned");
     83       
    7684        DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
    7785       
    7886        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
    79         //boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize");
    8087       
    8188        PerVolumePOSStreamFlatmap paged_solr_posfreq_flatmap
     
    8390                                     per_vol_progress_accum,per_vol,
    8491                                     strict_file_io);
    85         JavaRDD<String> pos_list = json_list_data.flatMap(paged_solr_posfreq_flatmap);
     92        JavaRDD<String> pos_list = json_list_data_rp.flatMap(paged_solr_posfreq_flatmap);
    8693        pos_list.setName("pos-stream");
    8794       
Note: See TracChangeset for help on using the changeset viewer.