Changeset 31372


Ignore:
Timestamp:
2017-01-31T00:06:39+13:00 (4 years ago)
Author:
davidb
Message:

Reworked to use sequenceFiles

Location:
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeJSON.java

    r31278 r31372  
    55import java.util.Iterator;
    66
     7import org.apache.hadoop.io.Text;
    78import org.apache.spark.api.java.function.FlatMapFunction;
     9import org.apache.spark.api.java.function.Function;
    810import org.apache.spark.api.java.function.VoidFunction;
    911import org.apache.spark.util.DoubleAccumulator;
     
    2224
    2325//public class PerVolumeJSON implements VoidFunction<String>
    24 public class PerVolumeJSON implements FlatMapFunction<String,String>
     26public class PerVolumeJSON implements Function<Text,Integer>
    2527{
    2628    private static final long serialVersionUID = 1L;
     
    3537    protected WhitelistBloomFilter _whitelist_bloomfilter;
    3638   
    37     protected DoubleAccumulator _progress_accum;
    38     protected double            _progress_step;
     39
    3940   
    4041     boolean _icu_tokenize;
     
    4344    public PerVolumeJSON(String input_dir, String whitelist_filename,
    4445                         String solr_url, String output_dir, int verbosity,
    45                          DoubleAccumulator progress_accum, double progress_step,
    4646                         boolean icu_tokenize, boolean strict_file_io)
    4747    {
     
    5353        _verbosity  = verbosity;
    5454       
    55         _progress_accum = progress_accum;
    56         _progress_step  = progress_step;
    57        
    5855        _icu_tokenize   = icu_tokenize;
    5956        _strict_file_io = strict_file_io;
     
    6259    }
    6360   
    64     //public void call(String json_file_in) throws IOException
    65     public Iterator<String> call(String json_file_in) throws IOException
    66    
     61   
     62    public Integer call(Text json_text) throws IOException
     63
    6764    {
    6865        if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) {
     
    7067        }
    7168
     69        int ef_num_pages = 0;
     70
     71        try {
     72
     73
     74            JSONObject extracted_feature_record  = new JSONObject(json_text.toString());
     75
     76            if (extracted_feature_record != null) {
     77                String volume_id = extracted_feature_record.getString("id");
     78
     79                //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
     80                //String title= ef_metadata.getString("title");
     81
     82                JSONObject ef_features = extracted_feature_record.getJSONObject("features");
     83
     84                int ef_page_count = ef_features.getInt("pageCount");
     85
     86                if (_verbosity >= 1) {
     87                    System.out.println("Processing: " + volume_id);
     88                    System.out.println("  pageCount = " + ef_page_count);
     89                }
     90
     91                JSONArray ef_pages = ef_features.getJSONArray("pages");
     92                ef_num_pages = ef_pages.length();
     93
     94
     95                for (int i = 0; i < ef_page_count; i++) {
     96                    String formatted_i = String.format("page-%06d", i);
     97                    String page_id = volume_id + "." + formatted_i;
     98
     99                    if (_verbosity >= 2) {
     100                        System.out.println("  Page: " + page_id);
     101                    }
     102
     103
     104                    JSONObject ef_page = ef_pages.getJSONObject(i);
     105
     106                    if (ef_page != null) {
     107                        // Convert to Solr add form
     108                        JSONObject solr_add_doc_json
     109                        = SolrDocJSON.generateSolrDocJSON(volume_id, page_id, ef_page, _whitelist_bloomfilter, _icu_tokenize);
     110
     111
     112                        if ((_verbosity >=2) && (i==20)) {
     113                            System.out.println("==================");
     114                            System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
     115                            System.out.println("==================");
     116                        }
     117
     118
     119                        if (_solr_url != null) {
     120                            if ((_verbosity >=2) && (i==20)) {
     121                                System.out.println("==================");
     122                                System.out.println("Posting to: " + _solr_url);
     123                                System.out.println("==================");
     124                            }
     125                            SolrDocJSON.postSolrDoc(_solr_url, solr_add_doc_json);
     126                        }
     127
     128
     129                    }
     130                    else {
     131                        System.err.println("Skipping: " + page_id);
     132                    }
     133
     134                }
     135            }
     136        }
     137        catch (Exception e) {
     138            if (_strict_file_io) {
     139                throw e;
     140            }
     141            else {
     142                e.printStackTrace();
     143            }
     144        }
     145       
     146        return ef_num_pages;
     147
     148    }
     149       
     150        /*
     151    //public void call(String json_file_in) throws IOException
     152    public Integer call(String json_file_in) throws IOException
     153   
     154    {
     155        if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) {
     156            _whitelist_bloomfilter = new WhitelistBloomFilter(_whitelist_filename,true);
     157        }
     158
     159        int ef_num_pages = 0;
     160       
    72161        ArrayList<String> ids = new ArrayList<String>(); // want it to be non-null so can return valid iterator
    73162       
     
    91180
    92181            JSONArray ef_pages = ef_features.getJSONArray("pages");
    93             int ef_num_pages = ef_pages.length();
     182            ef_num_pages = ef_pages.length();
    94183
    95184            // Make directory for page-level JSON output
     
    169258        }
    170259       
    171         //ids.add(volume_id);
    172         _progress_accum.add(_progress_step);
    173        
    174         return ids.iterator();
     260        return ef_num_pages;
     261               
    175262    }
     263    */
    176264}
    177265
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r31277 r31372  
    99
    1010import org.apache.commons.cli.*;
    11 
     11import org.apache.hadoop.io.Text;
    1212import org.apache.spark.api.java.*;
    1313import org.apache.spark.util.DoubleAccumulator;
     
    9292    }
    9393   
     94    public void execPerVolumeSequenceFile()
     95    {
     96        String spark_app_name = generateSparkAppName("Per Volume");     
     97
     98        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     99        JavaSparkContext jsc = new JavaSparkContext(conf);
     100        jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
     101
     102        //String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
     103        String packed_sequence_path = _json_list_filename;
     104       
     105        JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
     106   
     107        JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2);
     108       
     109        boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize");
     110        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
     111       
     112        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,
     113                                                       _solr_url,_output_dir,_verbosity,
     114                                                       icu_tokenize,strict_file_io);
     115
     116       
     117        JavaRDD<Integer> per_volume_page_count = json_text_rdd.map(per_vol_json);
     118       
     119        Integer num_page_ids = per_volume_page_count.reduce((a, b) -> a + b);
     120       
     121        System.out.println("");
     122        System.out.println("############");
     123        System.out.println("# Number of page ids: " + num_page_ids);
     124        System.out.println("############");
     125        System.out.println("");
     126
     127        jsc.close();
     128       
     129    }
     130   
     131    /*
    94132    public void execPerVolume()
    95133    {   
     
    136174        jsc.close();
    137175    }
    138    
    139    
    140    
     176    */
     177   
     178    /*
    141179    public void execPerPage()
    142180    {   
     
    146184        JavaSparkContext jsc = new JavaSparkContext(conf);
    147185       
    148         /*
    149         if (_verbosity >= 2) {
    150             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    151             System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    152         }
    153             */
     186       
    154187       
    155188        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
     
    188221        JavaRDD<String> per_page_ids = per_page_jsonobjects.map(paged_json_id_map);
    189222
    190 /*
    191         System.out.println("");
    192         System.out.println("############");
    193         System.out.println("# Progress Accumulator: " + progress_accum.value());
    194         System.out.println("############");
    195         System.out.println("");
     223       
     224        long num_page_ids = per_page_ids.count(); // trigger lazy eval of: flatmap:per-vol -> map:per-page
     225       
     226        System.out.println("");
     227        System.out.println("############");
     228        System.out.println("# Number of page ids: " + num_page_ids);
     229        System.out.println("############");
     230        System.out.println("");
     231
     232       
     233        //if (_output_dir != null) {
     234            //String rdd_save_file = "rdd-solr-json-page-files";
     235            //json_ids.saveAsTextFile(rdd_save_file);
     236            //System.out.println("############");
     237            //System.out.println("# Saved RDD of Solr JSON page files, top-level, as:");
     238            //System.out.println("#  " + rdd_save_file);
     239            //System.out.println("############");
     240            //System.out.println("");
     241        //}
     242       
     243       
     244        jsc.close();
     245    }
    196246*/
    197        
    198         long num_page_ids = per_page_ids.count(); // trigger lazy eval of: flatmap:per-vol -> map:per-page
    199        
    200         System.out.println("");
    201         System.out.println("############");
    202         System.out.println("# Number of page ids: " + num_page_ids);
    203         System.out.println("############");
    204         System.out.println("");
    205 
    206         /*
    207         if (_output_dir != null) {
    208             String rdd_save_file = "rdd-solr-json-page-files";
    209             json_ids.saveAsTextFile(rdd_save_file);
    210             System.out.println("############");
    211             System.out.println("# Saved RDD of Solr JSON page files, top-level, as:");
    212             System.out.println("#  " + rdd_save_file);
    213             System.out.println("############");
    214             System.out.println("");
    215         }
    216         */
    217        
    218         jsc.close();
    219     }
    220 
    221247   
    222248   
     
    322348            = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
    323349           
     350        prep_for_ingest.execPerVolumeSequenceFile();
     351       
     352        /*
    324353        String process_ef_json_mode = System.getProperty("wcsa-ef-ingest.process-ef-json-mode","per-page");
    325354        if (process_ef_json_mode.equals("per-volume")) {
     
    328357        else {
    329358            prep_for_ingest.execPerPage();
    330         }
     359        }*/
    331360    }
    332361}
Note: See TracChangeset for help on using the changeset viewer.