Changeset 31270 for other-projects

Show
Ignore:
Timestamp:
28.12.2016 10:36:17 (2 years ago)
Author:
davidb
Message:

Changed over to repartition approach

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForLangCount.java

    r31264 r31270  
    2727    //   http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ 
    2828     
    29     protected static final int DEFAULT_NUM_CORES = 6; 
    30     protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;  
     29    //protected static final int DEFAULT_NUM_CORES = 6; 
     30    //protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;  
     31    protected static final int DEFAULT_FILES_PER_PARTITION = 3000; 
    3132     
    3233    protected String _input_dir; 
     
    5354    public void execLangCount() 
    5455    {    
    55         String spark_app_name = generateSparkAppName("Per Page");        
     56        String spark_app_name = generateSparkAppName("Per Volume");      
    5657         
    5758        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
     
    6768        } 
    6869         
    69         int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
    70         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 
     70        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
     71        int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION); 
     72         
     73         
     74        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,files_per_partition).cache(); 
    7175        json_list_data.setName("JSON-file-list"); 
    7276         
     
    7478        double per_vol = 100.0/(double)num_volumes; 
    7579 
     80        int num_partitions = (int)(num_volumes/files_per_partition)+1; 
     81         
     82        JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions); 
     83         
    7684        DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent"); 
    7785         
     
    8290                                     per_vol_progress_accum,per_vol, 
    8391                                     strict_file_io); 
    84         JavaRDD<String> lang_list = json_list_data.flatMap(paged_solr_langfreq_flatmap);  
     92        JavaRDD<String> lang_list = json_list_data_rp.flatMap(paged_solr_langfreq_flatmap);  
    8593        lang_list.setName("lang-stream"); 
    8694