Changeset 31266 for other-projects

Show
Ignore:
Timestamp:
27.12.2016 18:51:42 (2 years ago)
Author:
davidb
Message:

Rekindling of per-volume approach. Also some tweaking to verbosity debug printing in per-page

Location:
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
Files:
3 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerPageJSONFlatmap.java

    r31252 r31266  
    105105                ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 
    106106            } 
    107             if (_verbosity >= 2) { 
     107            if (_verbosity >= 3) { 
    108108                System.out.print("  Pages: "); 
    109109            } 
     
    113113                String page_id = volume_id + "." + formatted_i; 
    114114 
    115                 if (_verbosity >= 2) { 
     115                if (_verbosity >= 3) { 
    116116                    if (i>0) { 
    117117                        System.out.print(", "); 
     
    123123                 
    124124                if (i==(ef_page_count-1)) { 
    125                     if (_verbosity >= 2) { 
     125                    if (_verbosity >= 3) { 
    126126                        System.out.println(); 
    127127                    } 
    128                     System.out.println("Sample output JSON page file: " + output_json_bz2); 
     128                    if (_verbosity >= 2) { 
     129                        System.out.println("Sample output JSON page file: " + output_json_bz2); 
     130                    } 
    129131                } 
    130132 
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeJSON.java

    r31252 r31266  
    22 
    33import java.io.IOException; 
     4import java.util.ArrayList; 
     5import java.util.Iterator; 
    46 
     7import org.apache.spark.api.java.function.FlatMapFunction; 
    58import org.apache.spark.api.java.function.VoidFunction; 
    69import org.apache.spark.util.DoubleAccumulator; 
     
    1821 
    1922 
    20 public class PerVolumeJSON implements VoidFunction<String>  
     23//public class PerVolumeJSON implements VoidFunction<String>  
     24public class PerVolumeJSON implements FlatMapFunction<String,String>  
    2125{ 
    2226    private static final long serialVersionUID = 1L; 
     
    5862    } 
    5963     
    60     //public Iterator<String> call(String json_file_in)  
    61     public void call(String json_file_in) throws IOException 
     64    //public void call(String json_file_in) throws IOException 
     65    public Iterator<String> call(String json_file_in) throws IOException 
     66     
    6267    {  
    6368        if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) { 
     
    6570        } 
    6671 
     72        ArrayList<String> ids = null; 
     73         
    6774        String full_json_file_in = _input_dir + "/" + json_file_in; 
    6875        JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(full_json_file_in); 
     
    94101            } 
    95102 
    96             //ArrayList<String> ids = new ArrayList<String>(ef_num_pages); 
     103            ids = new ArrayList<String>(ef_num_pages); 
    97104            for (int i = 0; i < ef_page_count; i++) { 
    98105                String formatted_i = String.format("page-%06d", i); 
     
    104111 
    105112                String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; 
    106                 //ids.add(output_json_bz2); // **** 
     113                ids.add(page_id);  
    107114 
    108115                if (i==0) { 
     
    164171        _progress_accum.add(_progress_step); 
    165172         
    166         //return ids.iterator(); 
     173        return ids.iterator(); 
    167174    } 
    168175} 
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r31252 r31266  
    2828    //   http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ 
    2929     
    30     protected static final int DEFAULT_NUM_CORES = 6; 
    31     protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;  
     30    //protected static final int DEFAULT_NUM_CORES = 6; 
     31    //protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;  
     32     
     33    protected static final int DEFAULT_FILES_PER_PARTITION = 3000; 
    3234     
    3335    protected String _input_dir; 
     
    9698        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
    9799        JavaSparkContext jsc = new JavaSparkContext(conf); 
    98          
     100             
     101        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
     102        int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION); 
     103         
     104        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 
     105 
     106        long num_volumes = json_list_data.count(); 
     107        double per_vol = 100.0/(double)num_volumes; 
     108         
     109        int num_partitions = (int)(num_volumes/files_per_partition)+1; 
     110         
     111        JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions); 
     112 
     113        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 
     114         
     115        System.err.println(); 
     116        System.err.println(); 
     117        System.err.println(); 
     118        System.err.println("****##### _input_dir =  " + _input_dir); 
     119        System.err.println(); 
     120        System.err.println(); 
     121        System.err.println(); 
     122         
     123        boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize"); 
     124        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 
     125         
     126        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,  
     127                                                       _solr_url,_output_dir,_verbosity, progress_accum,per_vol, 
     128                                                       icu_tokenize,strict_file_io); 
     129 
     130        //json_list_data_rp.foreach(per_vol_json); 
     131        JavaRDD<String> per_page_ids = json_list_data_rp.flatMap(per_vol_json); 
     132        long num_page_ids = per_page_ids.count(); 
     133         
     134        long num_ids = num_volumes; 
     135         
     136        System.out.println(""); 
     137        System.out.println("############"); 
     138        System.out.println("# Number of page ids: " + num_page_ids); 
     139        System.out.println("############"); 
     140        System.out.println(""); 
     141 
     142        jsc.close(); 
     143    } 
     144     
     145     
     146     
     147    public void execPerPage() 
     148    {    
     149        String spark_app_name = generateSparkAppName("Per Page");        
     150         
     151        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
     152        JavaSparkContext jsc = new JavaSparkContext(conf); 
     153         
     154        /* 
    99155        if (_verbosity >= 2) { 
    100156            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 
    101157            System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 
    102158        } 
    103                  
    104         int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
    105          
    106         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 
     159            */ 
     160         
     161        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
     162        int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION); 
     163         
     164        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 
    107165 
    108166        long num_volumes = json_list_data.count(); 
    109167        double per_vol = 100.0/(double)num_volumes; 
    110168         
    111         //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100)); 
    112  
    113         DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 
    114          
    115         System.err.println(); 
    116         System.err.println(); 
    117         System.err.println(); 
    118         System.err.println("****##### _input_dir =  " + _input_dir); 
    119         System.err.println(); 
    120         System.err.println(); 
    121         System.err.println(); 
    122          
    123         boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize"); 
    124         boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 
    125          
    126         PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,  
    127                                                        _solr_url,_output_dir,_verbosity, progress_accum,per_vol, 
    128                                                        icu_tokenize,strict_file_io); 
    129  
    130         json_list_data.foreach(per_vol_json); 
    131          
    132         long num_ids = num_volumes; 
    133          
    134         System.out.println(""); 
    135         System.out.println("############"); 
    136         System.out.println("# Number of volume ids: " + num_ids); 
    137         System.out.println("############"); 
    138         System.out.println(""); 
    139  
    140         jsc.close(); 
    141     } 
    142      
    143      
    144      
    145     public void execPerPage() 
    146     {    
    147         String spark_app_name = generateSparkAppName("Per Page");        
    148          
    149         SparkConf conf = new SparkConf().setAppName(spark_app_name); 
    150         JavaSparkContext jsc = new JavaSparkContext(conf); 
    151          
    152         if (_verbosity >= 2) { 
    153             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 
    154             System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 
    155         } 
    156                  
    157         int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
    158         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 
    159  
    160         long num_volumes = json_list_data.count(); 
    161         double per_vol = 100.0/(double)num_volumes; 
    162          
    163         //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100)); 
     169        int num_partitions = (int)(num_volumes/files_per_partition)+1; 
     170        JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions); 
    164171 
    165172        DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent"); 
     
    173180                                     per_vol_progress_accum,per_vol, 
    174181                                     icu_tokenize,strict_file_io); 
    175         JavaRDD<JSONObject> per_page_jsonobjects = json_list_data.flatMap(paged_solr_json_flatmap).cache(); 
     182        //JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap).cache(); 
     183        JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap); 
    176184         
    177185        //long num_page_ids = per_page_jsonobjects.count(); // trigger lazy eval of: flatmap:per-vol