Changeset 32106


Ignore:
Timestamp:
2018-01-16T22:39:16+13:00 (4 years ago)
Author:
davidb
Message:

Rekindle ability to process a json-filelist.txt using Spark

Location:
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
Files:
3 added
2 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeJSON.java

    r31784 r32106  
    2525
    2626//public class PerVolumeJSON implements VoidFunction<String>
    27 public class PerVolumeJSON implements Function<Text,Integer> 
     27public class PerVolumeJSON implements Function<Text,Integer>
    2828{
    2929    private static final long serialVersionUID = 1L;
    30     protected String _input_dir;
    31     protected String _whitelist_filename;
    32     protected String _langmap_directory;
    33    
    34     protected final ArrayList<String> _solr_endpoints;
    35     protected final int _solr_endpoints_len;
    36    
    37     //protected String _solr_url;
    38     protected String _output_dir;
    39    
    40     protected int    _verbosity;
    41    
    42     protected WhitelistBloomFilter _whitelist_bloomfilter;
    43     protected UniversalPOSLangMap _universal_langmap;
    44 
    45     boolean _icu_tokenize;
    46     boolean _strict_file_io;
     30    protected PerVolumeUtil _per_vol_util;
    4731
    4832    public PerVolumeJSON(String input_dir, String whitelist_filename, String langmap_directory,
     
    5034                         boolean icu_tokenize, boolean strict_file_io)
    5135    {
    52         System.out.println("*** PerVolumeJSON Constructor, langmap_directory = " + langmap_directory);
    5336       
    54         _input_dir  = input_dir;
    55         _whitelist_filename = whitelist_filename;
    56         _langmap_directory = langmap_directory;
    57        
    58         _solr_endpoints = solr_endpoints;
    59         _solr_endpoints_len = solr_endpoints.size();
    60        
    61         //_solr_url   = solr_url;
    62         _output_dir = output_dir;
    63         _verbosity  = verbosity;
    64        
    65         _icu_tokenize   = icu_tokenize;
    66         _strict_file_io = strict_file_io;
    67        
    68         _whitelist_bloomfilter = null;
    69         _universal_langmap = null;
     37        // Had issues with class not found in Spark when set up with inheritance
     38        _per_vol_util = new PerVolumeUtil(input_dir, whitelist_filename, langmap_directory,
     39                  solr_endpoints, output_dir, verbosity,
     40                  icu_tokenize, strict_file_io);
     41   
    7042    }
    7143   
     
    7446
    7547    {
    76             if (_whitelist_filename != null) {
    77 
    78             synchronized (_whitelist_filename) {
    79             if (_whitelist_bloomfilter == null) {
    80                
    81                 _whitelist_bloomfilter = new WhitelistBloomFilter(_whitelist_filename,true);
    82             }
    83             }
    84         }
    85        
    86         if (_langmap_directory != null) {
    87 
    88             synchronized (_langmap_directory) {
    89             if (_universal_langmap == null) {
    90                 _universal_langmap = new UniversalPOSLangMap(_langmap_directory);
    91             }
    92             }
    93         }
    94 
    95         int ef_num_pages = 0;
    96 
    97         String solr_url = null;
    98         if (_solr_endpoints_len > 0) {
    99             int random_choice = (int)(_solr_endpoints_len * Math.random());
    100             solr_url = _solr_endpoints.get(random_choice);
    101         }
    102        
    103         try {
    104 
    105 
    106             JSONObject extracted_feature_record  = new JSONObject(json_text.toString());
    107 
    108             if (extracted_feature_record != null) {
    109                 String volume_id = extracted_feature_record.getString("id");
    110 
    111                 JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
    112                 //String title= ef_metadata.getString("title");
    113 
    114                 //
    115                 // Top-level metadata Solr doc
    116                 //
    117                 JSONObject solr_add_metadata_doc_json = SolrDocJSON.generateToplevelMetadataSolrDocJSON(volume_id,ef_metadata);
    118                 if (solr_add_metadata_doc_json != null) {
    119                
    120                     if ((_verbosity >=2)) {
    121                         System.out.println("==================");
    122                         System.out.println("Metadata JSON: " + solr_add_metadata_doc_json.toString());
    123                         System.out.println("==================");
    124                     }
    125 
    126                     if (solr_url != null) {
    127 
    128                         if ((_verbosity >=2) ) {
    129                             System.out.println("==================");
    130                             System.out.println("Posting to: " + solr_url);
    131                             System.out.println("==================");
    132                         }
    133                         SolrDocJSON.postSolrDoc(solr_url, solr_add_metadata_doc_json, volume_id, "top-level-metadata");
    134                     }
    135                 }
    136                
    137                 //
    138                 // Now move on to POS extracted features per-page
    139                 //
    140                 boolean index_pages = true;
    141                 if (index_pages) {
    142                    
    143                     JSONObject ef_features = extracted_feature_record.getJSONObject("features");
    144 
    145                     int ef_page_count = ef_features.getInt("pageCount");
    146 
    147                     if (_verbosity >= 1) {
    148                         System.out.println("Processing: " + volume_id);
    149                         System.out.println("  pageCount = " + ef_page_count);
    150                     }
    151 
    152                     JSONArray ef_pages = ef_features.getJSONArray("pages");
    153                     ef_num_pages = ef_pages.length();
    154 
    155 
    156                     for (int i = 0; i < ef_page_count; i++) {
    157                         String formatted_i = String.format("page-%06d", i);
    158                         String page_id = volume_id + "." + formatted_i;
    159 
    160                         if (_verbosity >= 2) {
    161                             System.out.println("  Page: " + page_id);
    162                         }
    163 
    164 
    165                         JSONObject ef_page = ef_pages.getJSONObject(i);
    166 
    167                         if (ef_page != null) {
    168                             // Convert to Solr add form
    169                             JSONObject solr_add_doc_json
    170                             = SolrDocJSON.generateSolrDocJSON(volume_id, page_id,
    171                                                 ef_metadata, ef_page,
    172                                                 _whitelist_bloomfilter, _universal_langmap, _icu_tokenize);
    173 
    174 
    175                             if ((_verbosity >=2) && (i==20)) {
    176                                 System.out.println("==================");
    177                                 System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
    178                                 System.out.println("==================");
    179                             }
    180 
    181 
    182                             if (solr_url != null) {
    183                                 SolrDocJSON.postSolrDoc(solr_url, solr_add_doc_json,
    184                                         volume_id, page_id);
    185                             }
    186                         }
    187                         else {
    188                             System.err.println("Skipping: " + page_id);
    189                         }
    190 
    191                     }
    192                 }
    193                 else {
    194                     System.err.println("Skipping per-page POS text indexing");
    195                 }
    196 
    197             }
    198         }
    199         catch (Exception e) {
    200             if (_strict_file_io) {
    201                 throw e;
    202             }
    203             else {
    204                 e.printStackTrace();
    205             }
    206         }
    207        
    208         return ef_num_pages;
    209 
     48        return _per_vol_util.call(json_text);
    21049    }
    211        
    212         /*
    213     //public void call(String json_file_in) throws IOException
    214     public Integer call(String json_file_in) throws IOException
    215    
    216     {
    217         if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) {
    218             _whitelist_bloomfilter = new WhitelistBloomFilter(_whitelist_filename,true);
    219         }
    220 
    221         int ef_num_pages = 0;
    222        
    223         ArrayList<String> ids = new ArrayList<String>(); // want it to be non-null so can return valid iterator
    224        
    225         String full_json_file_in = _input_dir + "/" + json_file_in;
    226         JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(full_json_file_in);
    227        
    228         if (extracted_feature_record != null) {
    229             String volume_id = extracted_feature_record.getString("id");
    230 
    231             //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
    232             //String title= ef_metadata.getString("title");
    233 
    234             JSONObject ef_features = extracted_feature_record.getJSONObject("features");
    235 
    236             int ef_page_count = ef_features.getInt("pageCount");
    237 
    238             if (_verbosity >= 1) {
    239                 System.out.println("Processing: " + json_file_in);
    240                 System.out.println("  pageCount = " + ef_page_count);
    241             }
    242 
    243             JSONArray ef_pages = ef_features.getJSONArray("pages");
    244             ef_num_pages = ef_pages.length();
    245 
    246             // Make directory for page-level JSON output
    247             String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2");
    248             String page_json_dir = json_dir + "/pages";
    249 
    250             if (_output_dir != null) {
    251                 ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
    252             }
    253            
    254             ids = new ArrayList<String>(ef_num_pages);
    255             for (int i = 0; i < ef_page_count; i++) {
    256                 String formatted_i = String.format("page-%06d", i);
    257                 String page_id = volume_id + "." + formatted_i;
    258 
    259                 if (_verbosity >= 2) {
    260                     System.out.println("  Page: " + page_id);
    261                 }
    262 
    263                 String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2";
    264                 ids.add(page_id);
    265 
    266                 if (_verbosity >=2) {
    267                     if (i==0) {
    268                         System.out.println("Sample output JSON page file [i=0]: " + output_json_bz2);
    269                     }
    270                 }
    271                 JSONObject ef_page = ef_pages.getJSONObject(i);
    272 
    273                 if (ef_page != null) {
    274                     // Convert to Solr add form
    275                     JSONObject solr_add_doc_json
    276                     = SolrDocJSON.generateSolrDocJSON(volume_id, page_id, ef_page, _whitelist_bloomfilter, _icu_tokenize);
    277 
    278 
    279                     if ((_verbosity >=2) && (i==20)) {
    280                         System.out.println("==================");
    281                         System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
    282                         System.out.println("==================");
    283                     }
    284 
    285 
    286                     if (_solr_url != null) {
    287                         if ((_verbosity >=2) && (i==20)) {
    288                             System.out.println("==================");
    289                             System.out.println("Posting to: " + _solr_url);
    290                             System.out.println("==================");
    291                         }
    292                         SolrDocJSON.postSolrDoc(_solr_url, solr_add_doc_json);
    293                     }
    294 
    295                     if (_output_dir != null) {
    296                         if ((_verbosity >=2) && (i==20)) {
    297                             System.out.println("==================");
    298                             System.out.println("Saving to: " + _output_dir);
    299                             System.out.println("==================");
    300                         }
    301                         SolrDocJSON.saveSolrDoc(solr_add_doc_json, _output_dir + "/" + output_json_bz2);
    302                     }
    303                 }
    304                 else {
    305                     System.err.println("Skipping: " + page_id);
    306                 }
    307 
    308             }
    309         }
    310         else {
    311             // File did not exist, or could not be parsed
    312             String mess = "Failed to read in bzipped JSON file '" + full_json_file_in + "'";
    313             if (_strict_file_io) {
    314                 throw new IOException(mess);
    315             }
    316             else {
    317                 System.err.println("Warning: " + mess);
    318                 System.out.println("Warning: " + mess);
    319             }
    320         }
    321        
    322         return ef_num_pages;
    323                
    324     }
    325     */
    32650}
    32751
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r31597 r32106  
    142142   
    143143    /*
    144     public void execPerVolume()
     144    public void execPerVolumeJSONFileList()
    145145    {   
    146146        String spark_app_name = generateSparkAppName("Per Volume");     
     
    168168        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
    169169       
    170         PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,
    171                                                        _solr_url,_output_dir,_verbosity, progress_accum,per_vol,
    172                                                        icu_tokenize,strict_file_io);
     170        ArrayList<String> solr_endpoints = extrapolateSolrEndpoints(_solr_collection);
     171       
     172        //PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename,
     173        //                                             _solr_url,_output_dir,_verbosity, progress_accum,per_vol,
     174        //                                             icu_tokenize,strict_file_io);
     175        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename, _langmap_directory,
     176                                                        solr_endpoints,_output_dir,_verbosity,
     177                                                        icu_tokenize,strict_file_io);
    173178
    174179        //json_list_data_rp.foreach(per_vol_json);
Note: See TracChangeset for help on using the changeset viewer.