Show
Ignore:
Timestamp:
30.10.2016 23:51:07 (3 years ago)
Author:
davidb
Message:

Code to work per-volume and per-page

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures
Files:
1 added
3 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/JSONSolrTransform.java

    r30996 r31001  
    11package org.hathitrust.extractedfeatures; 
    22 
     3import java.io.BufferedReader; 
     4import java.io.BufferedWriter; 
     5import java.io.IOException; 
     6import java.io.InputStreamReader; 
     7import java.io.OutputStream; 
     8import java.net.HttpURLConnection; 
     9import java.net.URL; 
    310import java.util.Iterator; 
    411 
     12import org.apache.commons.compress.compressors.CompressorException; 
    513import org.json.JSONObject; 
    614 
     
    115123    } 
    116124 
     125    public static void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2) 
     126    { 
     127        try { 
     128            BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(output_file_json_bz2); 
     129            bw.write(solr_add_doc_json.toString()); 
     130            bw.close(); 
     131        } catch (IOException e) { 
     132            e.printStackTrace(); 
     133        } catch (CompressorException e) { 
     134            e.printStackTrace(); 
     135        } 
     136    } 
     137     
     138    public static void postSolrDoc(String post_url, JSONObject solr_add_doc_json) 
     139    { 
     140         
     141        //String curl_popen = "curl -X POST -H 'Content-Type: application/json'"; 
     142        //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'"; 
     143        //curl_popen += " --data-binary '"; 
     144        //curl_popen += "'" 
     145 
     146 
     147        try { 
     148            HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection())); 
     149            httpcon.setDoOutput(true); 
     150            httpcon.setRequestProperty("Content-Type", "application/json"); 
     151            httpcon.setRequestProperty("Accept", "application/json"); 
     152            httpcon.setRequestMethod("POST"); 
     153            httpcon.connect(); 
     154 
     155            byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8"); 
     156            OutputStream os = httpcon.getOutputStream(); 
     157            os.write(outputBytes); 
     158            os.close(); 
     159             
     160             
     161            // Read response 
     162            StringBuilder sb = new StringBuilder(); 
     163            BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream())); 
     164            String decodedString; 
     165            while ((decodedString = in.readLine()) != null) { 
     166                sb.append(decodedString); 
     167            } 
     168            in.close(); 
     169 
     170            JSONObject solr_status_json = new JSONObject(sb.toString()); 
     171            JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader"); 
     172            if (response_header_json != null) { 
     173                int status = response_header_json.getInt("status"); 
     174                if (status != 0) { 
     175                    System.err.println("Warning: POST request to " + post_url + " returned status " + status); 
     176                    System.err.println("Full response was: " + sb); 
     177                } 
     178            } 
     179            else { 
     180                System.err.println("Failed response to Solr POST: " + sb); 
     181            } 
     182             
     183             
     184             
     185        } 
     186        catch (Exception e) { 
     187            e.printStackTrace(); 
     188        } 
     189         
     190    } 
    117191} 
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/PagedJSON.java

    r30997 r31001  
    99import java.net.URL; 
    1010import java.util.ArrayList; 
     11import java.util.Iterator; 
    1112import java.util.Set; 
    1213 
     
    2829 
    2930 
    30 //class PagedJSON implements FlatMapFunction<String, String>  
    31 public class PagedJSON implements VoidFunction<String>  
     31class PagedJSON implements FlatMapFunction<String, JSONObject>, VoidFunction<JSONObject>  
     32//public class PagedJSON implements VoidFunction<String>  
    3233{ 
    3334    private static final long serialVersionUID = 1L; 
     
    5354    } 
    5455     
    55     public static void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2) 
    56     { 
    57         try { 
    58             BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(output_file_json_bz2); 
    59             bw.write(solr_add_doc_json.toString()); 
    60             bw.close(); 
    61         } catch (IOException e) { 
    62             e.printStackTrace(); 
    63         } catch (CompressorException e) { 
    64             e.printStackTrace(); 
    65         } 
    66     } 
    6756     
    68     public static void postSolrDoc(String post_url, JSONObject solr_add_doc_json) 
    69     { 
    70          
    71         //String curl_popen = "curl -X POST -H 'Content-Type: application/json'"; 
    72         //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'"; 
    73         //curl_popen += " --data-binary '"; 
    74         //curl_popen += "'" 
    75  
    76  
    77         try { 
    78             HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection())); 
    79             httpcon.setDoOutput(true); 
    80             httpcon.setRequestProperty("Content-Type", "application/json"); 
    81             httpcon.setRequestProperty("Accept", "application/json"); 
    82             httpcon.setRequestMethod("POST"); 
    83             httpcon.connect(); 
    84  
    85             byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8"); 
    86             OutputStream os = httpcon.getOutputStream(); 
    87             os.write(outputBytes); 
    88             os.close(); 
    89              
    90              
    91             // Read response 
    92             StringBuilder sb = new StringBuilder(); 
    93             BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream())); 
    94             String decodedString; 
    95             while ((decodedString = in.readLine()) != null) { 
    96                 sb.append(decodedString); 
    97             } 
    98             in.close(); 
    99  
    100             JSONObject solr_status_json = new JSONObject(sb.toString()); 
    101             JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader"); 
    102             if (response_header_json != null) { 
    103                 int status = response_header_json.getInt("status"); 
    104                 if (status != 0) { 
    105                     System.err.println("Warning: POST request to " + post_url + " returned status " + status); 
    106                     System.err.println("Full response was: " + sb); 
    107                 } 
    108             } 
    109             else { 
    110                 System.err.println("Failed response to Solr POST: " + sb); 
    111             } 
    112              
    113              
    114              
    115         } 
    116         catch (Exception e) { 
    117             e.printStackTrace(); 
    118         } 
    119          
    120     } 
    12157     
    122     //public Iterator<String> call(String json_file_in)  
    123     public void call(String json_file_in)  
     58    public Iterator<JSONObject> call(String json_file_in)  
     59    //public void call(String json_file_in)  
    12460    {  
    12561        JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(_input_dir + "/" + json_file_in); 
     
    14884        ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 
    14985         
    150         ArrayList<String> ids = new ArrayList<String>(ef_num_pages); 
     86        ArrayList<JSONObject> json_pages = new ArrayList<JSONObject>(ef_num_pages); 
    15187        for (int i = 0; i < ef_page_count; i++) { 
    15288            String formatted_i = String.format("page-%06d", i); 
     
    15894             
    15995            String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; 
    160             ids.add(output_json_bz2); 
     96            //ids.add(output_json_bz2); 
    16197             
    16298            if (i==0) { 
     
    165101             
    166102            JSONObject ef_page = ef_pages.getJSONObject(i); 
    167  
     103             
    168104            if (ef_page != null) { 
    169105                // Convert to Solr add form 
    170106                JSONObject solr_add_doc_json = JSONSolrTransform.generateSolrDocJSON(volume_id, page_id, ef_page); 
    171  
     107                solr_add_doc_json.put("filename_json_bz2", output_json_bz2); 
     108                 
     109                json_pages.add(solr_add_doc_json); 
    172110             
    173                 if ((_verbosity >=2) && (i==20)) { 
    174                     System.out.println("=================="); 
    175                     System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString()); 
    176                     System.out.println("=================="); 
    177                 } 
    178111                 
    179                              
    180                 if (_solr_url != null) { 
    181                     if ((_verbosity >=2) && (i==20)) { 
    182                         System.out.println("=================="); 
    183                         System.out.println("Posting to: " + _solr_url); 
    184                         System.out.println("=================="); 
    185                     } 
    186                     postSolrDoc(_solr_url, solr_add_doc_json); 
    187                 } 
    188  
    189                 if (_output_dir != null) { 
    190                     if ((_verbosity >=2) && (i==20)) { 
    191                         System.out.println("=================="); 
    192                         System.out.println("Saving to: " + _output_dir); 
    193                         System.out.println("=================="); 
    194                     } 
    195                     saveSolrDoc(solr_add_doc_json, _output_dir + "/" + output_json_bz2); 
    196                 } 
    197112            } 
    198113            else { 
     
    203118         
    204119         
    205         ids.add(volume_id); 
     120        //ids.add(volume_id); 
    206121        _progress_accum.add(_progress_step); 
    207122         
    208123        //return ids.iterator(); 
     124        return json_pages.iterator(); 
    209125    } 
     126     
     127    public void call(JSONObject solr_add_doc_json)  
     128    {  
     129        String output_json_bz2 = solr_add_doc_json.getString("filename_json_bz2"); 
     130        solr_add_doc_json.remove("filename_json_bz2"); 
     131         
     132        boolean random_test = (Math.random()>0.999); // every 1000 
     133         
     134        if ((_verbosity >=2) && (random_test)) { 
     135            System.out.println("=================="); 
     136            System.out.println("Sample output Solr add JSON [random test 1/1000]: " + solr_add_doc_json.toString()); 
     137            System.out.println("=================="); 
     138        } 
     139         
     140                     
     141        if (_solr_url != null) { 
     142            if ((_verbosity >=2) && (random_test)) { 
     143                System.out.println("=================="); 
     144                System.out.println("Posting to: " + _solr_url); 
     145                System.out.println("=================="); 
     146            } 
     147            JSONSolrTransform.postSolrDoc(_solr_url, solr_add_doc_json); 
     148        } 
     149 
     150        if (_output_dir != null) { 
     151            if ((_verbosity >=2) && (random_test)) { 
     152                System.out.println("=================="); 
     153                System.out.println("Saving to: " + _output_dir); 
     154                System.out.println("=================="); 
     155            } 
     156            JSONSolrTransform.saveSolrDoc(solr_add_doc_json, _output_dir + "/" + output_json_bz2); 
     157        } 
     158    } 
     159     
    210160} 
    211161 
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r30998 r31001  
    77import org.apache.spark.util.DoubleAccumulator; 
    88import org.hathitrust.extractedfeatures.PagedJSON; 
     9import org.json.JSONObject; 
    910import org.apache.spark.SparkConf; 
    1011 
     
    4142    } 
    4243 
    43     public void exec() 
     44    public void execPerVolume() 
    4445    {    
    45         String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest"; 
     46        String spark_app_name = "[Per Volume] Extract Features: Process for Solr Ingest"; 
    4647        spark_app_name += " [" + _json_list_filename + "]"; 
    4748 
     
    6162        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 
    6263         
     64        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 
     65 
     66        json_list_data.foreach(per_vol_json); 
     67         
     68        long num_ids = num_volumes; 
     69         
     70        System.out.println(""); 
     71        System.out.println("############"); 
     72        System.out.println("# Number of volume ids: " + num_ids); 
     73        System.out.println("############"); 
     74        System.out.println(""); 
     75 
     76        jsc.close(); 
     77    } 
     78 
     79    public void execPerPage() 
     80    {    
     81        String spark_app_name = "[Per Page] Extract Features: Process for Solr Ingest"; 
     82        spark_app_name += " [" + _json_list_filename + "]"; 
     83 
     84        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
     85        JavaSparkContext jsc = new JavaSparkContext(conf); 
     86         
     87        if (_verbosity >= 2) { 
     88            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 
     89            System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 
     90        } 
     91                 
     92        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 
     93 
     94        long num_volumes = json_list_data.count(); 
     95        double per_vol = 100.0/(double)num_volumes; 
     96         
     97        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 
     98         
    6399        PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 
    64         //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 
    65  
    66         json_list_data.foreach(paged_json); 
    67  
    68      
     100        JavaRDD<JSONObject> json_per_page_ids = json_list_data.flatMap(paged_json).cache(); 
     101         
     102        json_per_page_ids.foreach(paged_json); 
     103 
    69104/* 
    70105        System.out.println(""); 
     
    75110*/ 
    76111         
    77         //long num_ids = json_ids.count(); 
    78         long num_ids = num_volumes; 
    79          
    80         System.out.println(""); 
    81         System.out.println("############"); 
    82         System.out.println("# Number of page ids: " + num_ids); 
     112        long num_page_ids = json_per_page_ids.count(); 
     113         
     114        System.out.println(""); 
     115        System.out.println("############"); 
     116        System.out.println("# Number of page ids: " + num_page_ids); 
    83117        System.out.println("############"); 
    84118        System.out.println(""); 
     
    99133    } 
    100134 
     135     
     136     
     137 
    101138    public static void print_usage(HelpFormatter formatter, Options options) 
    102139    { 
     
    174211        ProcessForSolrIngest prep_for_ingest  
    175212            = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity); 
    176         prep_for_ingest.exec(); 
     213        prep_for_ingest.execPerVolume(); 
    177214    } 
    178215}