Changeset 31266


Ignore:
Timestamp:
12/27/16 18:51:42 (4 years ago)
Author:
davidb
Message:

Rekindling of per-volume approach. Also some tweaking to verbosity debug printing in per-page

Location:
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerPageJSONFlatmap.java

    r31252 r31266  
    105105                ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
    106106            }
    107             if (_verbosity >= 2) {
     107            if (_verbosity >= 3) {
    108108                System.out.print("  Pages: ");
    109109            }
     
    113113                String page_id = volume_id + "." + formatted_i;
    114114
    115                 if (_verbosity >= 2) {
     115                if (_verbosity >= 3) {
    116116                    if (i>0) {
    117117                        System.out.print(", ");
     
    123123               
    124124                if (i==(ef_page_count-1)) {
    125                     if (_verbosity >= 2) {
     125                    if (_verbosity >= 3) {
    126126                        System.out.println();
    127127                    }
    128                     System.out.println("Sample output JSON page file: " + output_json_bz2);
     128                    if (_verbosity >= 2) {
     129                        System.out.println("Sample output JSON page file: " + output_json_bz2);
     130                    }
    129131                }
    130132
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeJSON.java

    r31252 r31266  
    22
    33import java.io.IOException;
     4import java.util.ArrayList;
     5import java.util.Iterator;
    46
     7import org.apache.spark.api.java.function.FlatMapFunction;
    58import org.apache.spark.api.java.function.VoidFunction;
    69import org.apache.spark.util.DoubleAccumulator;
     
    1821
    1922
    20 public class PerVolumeJSON implements VoidFunction<String>
     23//public class PerVolumeJSON implements VoidFunction<String>
     24public class PerVolumeJSON implements FlatMapFunction<String,String>
    2125{
    2226    private static final long serialVersionUID = 1L;
     
    5862    }
    5963   
    60     //public Iterator<String> call(String json_file_in)
    61     public void call(String json_file_in) throws IOException
     64    //public void call(String json_file_in) throws IOException
     65    public Iterator<String> call(String json_file_in) throws IOException
     66   
    6267    {
    6368        if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) {
     
    6570        }
    6671
     72        ArrayList<String> ids = null;
     73       
    6774        String full_json_file_in = _input_dir + "/" + json_file_in;
    6875        JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(full_json_file_in);
     
    94101            }
    95102
    96             //ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
     103            ids = new ArrayList<String>(ef_num_pages);
    97104            for (int i = 0; i < ef_page_count; i++) {
    98105                String formatted_i = String.format("page-%06d", i);
     
    104111
    105112                String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2";
    106                 //ids.add(output_json_bz2); // ****
     113                ids.add(page_id);
    107114
    108115                if (i==0) {
     
    164171        _progress_accum.add(_progress_step);
    165172       
    166         //return ids.iterator();
     173        return ids.iterator();
    167174    }
    168175}
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r31252 r31266  
    2828    //   http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
    2929   
    30     protected static final int DEFAULT_NUM_CORES = 6;
    31     protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;
     30    //protected static final int DEFAULT_NUM_CORES = 6;
     31    //protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES;
     32   
     33    protected static final int DEFAULT_FILES_PER_PARTITION = 3000;
    3234   
    3335    protected String _input_dir;
     
    9698        SparkConf conf = new SparkConf().setAppName(spark_app_name);
    9799        JavaSparkContext jsc = new JavaSparkContext(conf);
    98        
     100           
     101        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
     102        int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION);
     103       
     104        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
     105
     106        long num_volumes = json_list_data.count();
     107        double per_vol = 100.0/(double)num_volumes;
     108       
     109        int num_partitions = (int)(num_volumes/files_per_partition)+1;
     110       
     111        JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
     112
     113        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
     114       
     115        System.err.println();
     116        System.err.println();
     117        System.err.println();
     118        System.err.println("****##### _input_dir =  " + _input_dir);
     119        System.err.println();
     120        System.err.println();
     121        System.err.println();
     122       
     123        boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize");
     124        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
     125       
     126        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,
     127                                                       _solr_url,_output_dir,_verbosity, progress_accum,per_vol,
     128                                                       icu_tokenize,strict_file_io);
     129
     130        //json_list_data_rp.foreach(per_vol_json);
     131        JavaRDD<String> per_page_ids = json_list_data_rp.flatMap(per_vol_json);
     132        long num_page_ids = per_page_ids.count();
     133       
     134        long num_ids = num_volumes;
     135       
     136        System.out.println("");
     137        System.out.println("############");
     138        System.out.println("# Number of page ids: " + num_page_ids);
     139        System.out.println("############");
     140        System.out.println("");
     141
     142        jsc.close();
     143    }
     144   
     145   
     146   
     147    public void execPerPage()
     148    {   
     149        String spark_app_name = generateSparkAppName("Per Page");       
     150       
     151        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     152        JavaSparkContext jsc = new JavaSparkContext(conf);
     153       
     154        /*
    99155        if (_verbosity >= 2) {
    100156            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    101157            System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    102158        }
    103                
    104         int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
    105        
    106         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache();
     159            */
     160       
     161        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
     162        int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION);
     163       
     164        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
    107165
    108166        long num_volumes = json_list_data.count();
    109167        double per_vol = 100.0/(double)num_volumes;
    110168       
    111         //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100));
    112 
    113         DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
    114        
    115         System.err.println();
    116         System.err.println();
    117         System.err.println();
    118         System.err.println("****##### _input_dir =  " + _input_dir);
    119         System.err.println();
    120         System.err.println();
    121         System.err.println();
    122        
    123         boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize");
    124         boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
    125        
    126         PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,
    127                                                        _solr_url,_output_dir,_verbosity, progress_accum,per_vol,
    128                                                        icu_tokenize,strict_file_io);
    129 
    130         json_list_data.foreach(per_vol_json);
    131        
    132         long num_ids = num_volumes;
    133        
    134         System.out.println("");
    135         System.out.println("############");
    136         System.out.println("# Number of volume ids: " + num_ids);
    137         System.out.println("############");
    138         System.out.println("");
    139 
    140         jsc.close();
    141     }
    142    
    143    
    144    
    145     public void execPerPage()
    146     {   
    147         String spark_app_name = generateSparkAppName("Per Page");       
    148        
    149         SparkConf conf = new SparkConf().setAppName(spark_app_name);
    150         JavaSparkContext jsc = new JavaSparkContext(conf);
    151        
    152         if (_verbosity >= 2) {
    153             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    154             System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    155         }
    156                
    157         int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
    158         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache();
    159 
    160         long num_volumes = json_list_data.count();
    161         double per_vol = 100.0/(double)num_volumes;
    162        
    163         //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100));
     169        int num_partitions = (int)(num_volumes/files_per_partition)+1;
     170        JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
    164171
    165172        DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
     
    173180                                     per_vol_progress_accum,per_vol,
    174181                                     icu_tokenize,strict_file_io);
    175         JavaRDD<JSONObject> per_page_jsonobjects = json_list_data.flatMap(paged_solr_json_flatmap).cache();
     182        //JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap).cache();
     183        JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap);
    176184       
    177185        //long num_page_ids = per_page_jsonobjects.count(); // trigger lazy eval of: flatmap:per-vol
Note: See TracChangeset for help on using the changeset viewer.