Changeset 31372 for other-projects

Show
Ignore:
Timestamp:
31.01.2017 00:06:39 (2 years ago)
Author:
davidb
Message:

Reworked to use sequenceFiles

Location:
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeJSON.java

    r31278 r31372  
    55import java.util.Iterator; 
    66 
     7import org.apache.hadoop.io.Text; 
    78import org.apache.spark.api.java.function.FlatMapFunction; 
     9import org.apache.spark.api.java.function.Function; 
    810import org.apache.spark.api.java.function.VoidFunction; 
    911import org.apache.spark.util.DoubleAccumulator; 
     
    2224 
    2325//public class PerVolumeJSON implements VoidFunction<String>  
    24 public class PerVolumeJSON implements FlatMapFunction<String,String>  
     26public class PerVolumeJSON implements Function<Text,Integer>  
    2527{ 
    2628    private static final long serialVersionUID = 1L; 
     
    3537    protected WhitelistBloomFilter _whitelist_bloomfilter; 
    3638     
    37     protected DoubleAccumulator _progress_accum; 
    38     protected double            _progress_step; 
     39 
    3940     
    4041     boolean _icu_tokenize; 
     
    4344    public PerVolumeJSON(String input_dir, String whitelist_filename, 
    4445                         String solr_url, String output_dir, int verbosity,  
    45                          DoubleAccumulator progress_accum, double progress_step, 
    4646                         boolean icu_tokenize, boolean strict_file_io) 
    4747    { 
     
    5353        _verbosity  = verbosity; 
    5454         
    55         _progress_accum = progress_accum; 
    56         _progress_step  = progress_step; 
    57          
    5855        _icu_tokenize   = icu_tokenize; 
    5956        _strict_file_io = strict_file_io; 
     
    6259    } 
    6360     
    64     //public void call(String json_file_in) throws IOException 
    65     public Iterator<String> call(String json_file_in) throws IOException 
    66      
     61     
     62    public Integer call(Text json_text) throws IOException 
     63 
    6764    {  
    6865        if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) { 
     
    7067        } 
    7168 
     69        int ef_num_pages = 0; 
     70 
     71        try { 
     72 
     73 
     74            JSONObject extracted_feature_record  = new JSONObject(json_text.toString()); 
     75 
     76            if (extracted_feature_record != null) { 
     77                String volume_id = extracted_feature_record.getString("id"); 
     78 
     79                //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); 
     80                //String title= ef_metadata.getString("title"); 
     81 
     82                JSONObject ef_features = extracted_feature_record.getJSONObject("features"); 
     83 
     84                int ef_page_count = ef_features.getInt("pageCount"); 
     85 
     86                if (_verbosity >= 1) { 
     87                    System.out.println("Processing: " + volume_id); 
     88                    System.out.println("  pageCount = " + ef_page_count); 
     89                } 
     90 
     91                JSONArray ef_pages = ef_features.getJSONArray("pages"); 
     92                ef_num_pages = ef_pages.length(); 
     93 
     94 
     95                for (int i = 0; i < ef_page_count; i++) { 
     96                    String formatted_i = String.format("page-%06d", i); 
     97                    String page_id = volume_id + "." + formatted_i; 
     98 
     99                    if (_verbosity >= 2) { 
     100                        System.out.println("  Page: " + page_id); 
     101                    } 
     102 
     103 
     104                    JSONObject ef_page = ef_pages.getJSONObject(i); 
     105 
     106                    if (ef_page != null) { 
     107                        // Convert to Solr add form 
     108                        JSONObject solr_add_doc_json  
     109                        = SolrDocJSON.generateSolrDocJSON(volume_id, page_id, ef_page, _whitelist_bloomfilter, _icu_tokenize); 
     110 
     111 
     112                        if ((_verbosity >=2) && (i==20)) { 
     113                            System.out.println("=================="); 
     114                            System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString()); 
     115                            System.out.println("=================="); 
     116                        } 
     117 
     118 
     119                        if (_solr_url != null) { 
     120                            if ((_verbosity >=2) && (i==20)) { 
     121                                System.out.println("=================="); 
     122                                System.out.println("Posting to: " + _solr_url); 
     123                                System.out.println("=================="); 
     124                            } 
     125                            SolrDocJSON.postSolrDoc(_solr_url, solr_add_doc_json); 
     126                        } 
     127 
     128 
     129                    } 
     130                    else { 
     131                        System.err.println("Skipping: " + page_id); 
     132                    } 
     133 
     134                } 
     135            } 
     136        } 
     137        catch (Exception e) { 
     138            if (_strict_file_io) { 
     139                throw e; 
     140            } 
     141            else { 
     142                e.printStackTrace(); 
     143            } 
     144        } 
     145         
     146        return ef_num_pages; 
     147 
     148    } 
     149         
     150        /* 
     151    //public void call(String json_file_in) throws IOException 
     152    public Integer call(String json_file_in) throws IOException 
     153     
     154    {  
     155        if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) { 
     156            _whitelist_bloomfilter = new WhitelistBloomFilter(_whitelist_filename,true); 
     157        } 
     158 
     159        int ef_num_pages = 0; 
     160         
    72161        ArrayList<String> ids = new ArrayList<String>(); // want it to be non-null so can return valid iterator 
    73162         
     
    91180 
    92181            JSONArray ef_pages = ef_features.getJSONArray("pages"); 
    93             int ef_num_pages = ef_pages.length(); 
     182            ef_num_pages = ef_pages.length(); 
    94183 
    95184            // Make directory for page-level JSON output 
     
    169258        } 
    170259         
    171         //ids.add(volume_id); 
    172         _progress_accum.add(_progress_step); 
    173          
    174         return ids.iterator(); 
     260        return ef_num_pages; 
     261                 
    175262    } 
     263    */ 
    176264} 
    177265 
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r31277 r31372  
    99 
    1010import org.apache.commons.cli.*; 
    11  
     11import org.apache.hadoop.io.Text; 
    1212import org.apache.spark.api.java.*; 
    1313import org.apache.spark.util.DoubleAccumulator; 
     
    9292    } 
    9393     
     94    public void execPerVolumeSequenceFile() 
     95    { 
     96        String spark_app_name = generateSparkAppName("Per Volume");      
     97 
     98        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
     99        JavaSparkContext jsc = new JavaSparkContext(conf); 
     100        jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin"); 
     101 
     102        //String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef"; 
     103        String packed_sequence_path = _json_list_filename; 
     104         
     105        JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class); 
     106     
     107        JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2); 
     108         
     109        boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize"); 
     110        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 
     111         
     112        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,  
     113                                                       _solr_url,_output_dir,_verbosity, 
     114                                                       icu_tokenize,strict_file_io); 
     115 
     116         
     117        JavaRDD<Integer> per_volume_page_count = json_text_rdd.map(per_vol_json); 
     118         
     119        Integer num_page_ids = per_volume_page_count.reduce((a, b) -> a + b); 
     120         
     121        System.out.println(""); 
     122        System.out.println("############"); 
     123        System.out.println("# Number of page ids: " + num_page_ids); 
     124        System.out.println("############"); 
     125        System.out.println(""); 
     126 
     127        jsc.close(); 
     128         
     129    } 
     130     
     131    /* 
    94132    public void execPerVolume() 
    95133    {    
     
    136174        jsc.close(); 
    137175    } 
    138      
    139      
    140      
     176    */ 
     177     
     178    /* 
    141179    public void execPerPage() 
    142180    {    
     
    146184        JavaSparkContext jsc = new JavaSparkContext(conf); 
    147185         
    148         /* 
    149         if (_verbosity >= 2) { 
    150             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 
    151             System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 
    152         } 
    153             */ 
     186         
    154187         
    155188        //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
     
    188221        JavaRDD<String> per_page_ids = per_page_jsonobjects.map(paged_json_id_map); 
    189222 
    190 /* 
    191         System.out.println(""); 
    192         System.out.println("############"); 
    193         System.out.println("# Progress Accumulator: " + progress_accum.value()); 
    194         System.out.println("############"); 
    195         System.out.println(""); 
     223         
     224        long num_page_ids = per_page_ids.count(); // trigger lazy eval of: flatmap:per-vol -> map:per-page 
     225         
     226        System.out.println(""); 
     227        System.out.println("############"); 
     228        System.out.println("# Number of page ids: " + num_page_ids); 
     229        System.out.println("############"); 
     230        System.out.println(""); 
     231 
     232         
     233        //if (_output_dir != null) { 
     234            //String rdd_save_file = "rdd-solr-json-page-files"; 
     235            //json_ids.saveAsTextFile(rdd_save_file); 
     236            //System.out.println("############"); 
     237            //System.out.println("# Saved RDD of Solr JSON page files, top-level, as:"); 
     238            //System.out.println("#  " + rdd_save_file); 
     239            //System.out.println("############"); 
     240            //System.out.println(""); 
     241        //} 
     242         
     243         
     244        jsc.close(); 
     245    } 
    196246*/ 
    197          
    198         long num_page_ids = per_page_ids.count(); // trigger lazy eval of: flatmap:per-vol -> map:per-page 
    199          
    200         System.out.println(""); 
    201         System.out.println("############"); 
    202         System.out.println("# Number of page ids: " + num_page_ids); 
    203         System.out.println("############"); 
    204         System.out.println(""); 
    205  
    206         /* 
    207         if (_output_dir != null) { 
    208             String rdd_save_file = "rdd-solr-json-page-files"; 
    209             json_ids.saveAsTextFile(rdd_save_file); 
    210             System.out.println("############"); 
    211             System.out.println("# Saved RDD of Solr JSON page files, top-level, as:"); 
    212             System.out.println("#  " + rdd_save_file); 
    213             System.out.println("############"); 
    214             System.out.println(""); 
    215         } 
    216         */ 
    217          
    218         jsc.close(); 
    219     } 
    220  
    221247     
    222248     
     
    322348            = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity); 
    323349             
     350        prep_for_ingest.execPerVolumeSequenceFile(); 
     351         
     352        /* 
    324353        String process_ef_json_mode = System.getProperty("wcsa-ef-ingest.process-ef-json-mode","per-page"); 
    325354        if (process_ef_json_mode.equals("per-volume")) {  
     
    328357        else { 
    329358            prep_for_ingest.execPerPage(); 
    330         } 
     359        }*/ 
    331360    } 
    332361}