Ignore:
Timestamp:
2017-01-31T00:06:39+13:00 (7 years ago)
Author:
davidb
Message:

Reworked to use sequenceFiles

File:
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r31277 r31372  
    99
    1010import org.apache.commons.cli.*;
    11 
     11import org.apache.hadoop.io.Text;
    1212import org.apache.spark.api.java.*;
    1313import org.apache.spark.util.DoubleAccumulator;
     
    9292    }
    9393   
     94    public void execPerVolumeSequenceFile()
     95    {
     96        String spark_app_name = generateSparkAppName("Per Volume");     
     97
     98        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     99        JavaSparkContext jsc = new JavaSparkContext(conf);
     100        jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
     101
     102        //String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
     103        String packed_sequence_path = _json_list_filename;
     104       
     105        JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
     106   
     107        JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2);
     108       
     109        boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize");
     110        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
     111       
     112        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,
     113                                                       _solr_url,_output_dir,_verbosity,
     114                                                       icu_tokenize,strict_file_io);
     115
     116       
     117        JavaRDD<Integer> per_volume_page_count = json_text_rdd.map(per_vol_json);
     118       
     119        Integer num_page_ids = per_volume_page_count.reduce((a, b) -> a + b);
     120       
     121        System.out.println("");
     122        System.out.println("############");
     123        System.out.println("# Number of page ids: " + num_page_ids);
     124        System.out.println("############");
     125        System.out.println("");
     126
     127        jsc.close();
     128       
     129    }
     130   
     131    /*
    94132    public void execPerVolume()
    95133    {   
     
    136174        jsc.close();
    137175    }
    138    
    139    
    140    
     176    */
     177   
     178    /*
    141179    public void execPerPage()
    142180    {   
     
    146184        JavaSparkContext jsc = new JavaSparkContext(conf);
    147185       
    148         /*
    149         if (_verbosity >= 2) {
    150             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    151             System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    152         }
    153             */
     186       
    154187       
    155188        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
     
    188221        JavaRDD<String> per_page_ids = per_page_jsonobjects.map(paged_json_id_map);
    189222
    190 /*
    191         System.out.println("");
    192         System.out.println("############");
    193         System.out.println("# Progress Accumulator: " + progress_accum.value());
    194         System.out.println("############");
    195         System.out.println("");
     223       
     224        long num_page_ids = per_page_ids.count(); // trigger lazy eval of: flatmap:per-vol -> map:per-page
     225       
     226        System.out.println("");
     227        System.out.println("############");
     228        System.out.println("# Number of page ids: " + num_page_ids);
     229        System.out.println("############");
     230        System.out.println("");
     231
     232       
     233        //if (_output_dir != null) {
     234            //String rdd_save_file = "rdd-solr-json-page-files";
     235            //json_ids.saveAsTextFile(rdd_save_file);
     236            //System.out.println("############");
     237            //System.out.println("# Saved RDD of Solr JSON page files, top-level, as:");
     238            //System.out.println("#  " + rdd_save_file);
     239            //System.out.println("############");
     240            //System.out.println("");
     241        //}
     242       
     243       
     244        jsc.close();
     245    }
    196246*/
    197        
    198         long num_page_ids = per_page_ids.count(); // trigger lazy eval of: flatmap:per-vol -> map:per-page
    199        
    200         System.out.println("");
    201         System.out.println("############");
    202         System.out.println("# Number of page ids: " + num_page_ids);
    203         System.out.println("############");
    204         System.out.println("");
    205 
    206         /*
    207         if (_output_dir != null) {
    208             String rdd_save_file = "rdd-solr-json-page-files";
    209             json_ids.saveAsTextFile(rdd_save_file);
    210             System.out.println("############");
    211             System.out.println("# Saved RDD of Solr JSON page files, top-level, as:");
    212             System.out.println("#  " + rdd_save_file);
    213             System.out.println("############");
    214             System.out.println("");
    215         }
    216         */
    217        
    218         jsc.close();
    219     }
    220 
    221247   
    222248   
     
    322348            = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
    323349           
     350        prep_for_ingest.execPerVolumeSequenceFile();
     351       
     352        /*
    324353        String process_ef_json_mode = System.getProperty("wcsa-ef-ingest.process-ef-json-mode","per-page");
    325354        if (process_ef_json_mode.equals("per-volume")) {
     
    328357        else {
    329358            prep_for_ingest.execPerPage();
    330         }
     359        }*/
    331360    }
    332361}
Note: See TracChangeset for help on using the changeset viewer.