Changeset 31001


Ignore:
Timestamp:
2016-10-30T23:51:07+13:00 (6 years ago)
Author:
davidb
Message:

Code to work per-volume and per-page

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures
Files:
1 added
3 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/JSONSolrTransform.java

    r30996 r31001  
    11package org.hathitrust.extractedfeatures;
    22
     3import java.io.BufferedReader;
     4import java.io.BufferedWriter;
     5import java.io.IOException;
     6import java.io.InputStreamReader;
     7import java.io.OutputStream;
     8import java.net.HttpURLConnection;
     9import java.net.URL;
    310import java.util.Iterator;
    411
     12import org.apache.commons.compress.compressors.CompressorException;
    513import org.json.JSONObject;
    614
     
    115123    }
    116124
     125    public static void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2)
     126    {
     127        try {
     128            BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(output_file_json_bz2);
     129            bw.write(solr_add_doc_json.toString());
     130            bw.close();
     131        } catch (IOException e) {
     132            e.printStackTrace();
     133        } catch (CompressorException e) {
     134            e.printStackTrace();
     135        }
     136    }
     137   
     138    public static void postSolrDoc(String post_url, JSONObject solr_add_doc_json)
     139    {
     140       
     141        //String curl_popen = "curl -X POST -H 'Content-Type: application/json'";
     142        //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'";
     143        //curl_popen += " --data-binary '";
     144        //curl_popen += "'"
     145
     146
     147        try {
     148            HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection()));
     149            httpcon.setDoOutput(true);
     150            httpcon.setRequestProperty("Content-Type", "application/json");
     151            httpcon.setRequestProperty("Accept", "application/json");
     152            httpcon.setRequestMethod("POST");
     153            httpcon.connect();
     154
     155            byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8");
     156            OutputStream os = httpcon.getOutputStream();
     157            os.write(outputBytes);
     158            os.close();
     159           
     160           
     161            // Read response
     162            StringBuilder sb = new StringBuilder();
     163            BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream()));
     164            String decodedString;
     165            while ((decodedString = in.readLine()) != null) {
     166                sb.append(decodedString);
     167            }
     168            in.close();
     169
     170            JSONObject solr_status_json = new JSONObject(sb.toString());
     171            JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader");
     172            if (response_header_json != null) {
     173                int status = response_header_json.getInt("status");
     174                if (status != 0) {
     175                    System.err.println("Warning: POST request to " + post_url + " returned status " + status);
     176                    System.err.println("Full response was: " + sb);
     177                }
     178            }
     179            else {
     180                System.err.println("Failed response to Solr POST: " + sb);
     181            }
     182           
     183           
     184           
     185        }
     186        catch (Exception e) {
     187            e.printStackTrace();
     188        }
     189       
     190    }
    117191}
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/PagedJSON.java

    r30997 r31001  
    99import java.net.URL;
    1010import java.util.ArrayList;
     11import java.util.Iterator;
    1112import java.util.Set;
    1213
     
    2829
    2930
    30 //class PagedJSON implements FlatMapFunction<String, String>
    31 public class PagedJSON implements VoidFunction<String>
     31class PagedJSON implements FlatMapFunction<String, JSONObject>, VoidFunction<JSONObject>
     32//public class PagedJSON implements VoidFunction<String>
    3233{
    3334    private static final long serialVersionUID = 1L;
     
    5354    }
    5455   
    55     public static void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2)
    56     {
    57         try {
    58             BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(output_file_json_bz2);
    59             bw.write(solr_add_doc_json.toString());
    60             bw.close();
    61         } catch (IOException e) {
    62             e.printStackTrace();
    63         } catch (CompressorException e) {
    64             e.printStackTrace();
    65         }
    66     }
    6756   
    68     public static void postSolrDoc(String post_url, JSONObject solr_add_doc_json)
    69     {
    70        
    71         //String curl_popen = "curl -X POST -H 'Content-Type: application/json'";
    72         //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'";
    73         //curl_popen += " --data-binary '";
    74         //curl_popen += "'"
    75 
    76 
    77         try {
    78             HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection()));
    79             httpcon.setDoOutput(true);
    80             httpcon.setRequestProperty("Content-Type", "application/json");
    81             httpcon.setRequestProperty("Accept", "application/json");
    82             httpcon.setRequestMethod("POST");
    83             httpcon.connect();
    84 
    85             byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8");
    86             OutputStream os = httpcon.getOutputStream();
    87             os.write(outputBytes);
    88             os.close();
    89            
    90            
    91             // Read response
    92             StringBuilder sb = new StringBuilder();
    93             BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream()));
    94             String decodedString;
    95             while ((decodedString = in.readLine()) != null) {
    96                 sb.append(decodedString);
    97             }
    98             in.close();
    99 
    100             JSONObject solr_status_json = new JSONObject(sb.toString());
    101             JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader");
    102             if (response_header_json != null) {
    103                 int status = response_header_json.getInt("status");
    104                 if (status != 0) {
    105                     System.err.println("Warning: POST request to " + post_url + " returned status " + status);
    106                     System.err.println("Full response was: " + sb);
    107                 }
    108             }
    109             else {
    110                 System.err.println("Failed response to Solr POST: " + sb);
    111             }
    112            
    113            
    114            
    115         }
    116         catch (Exception e) {
    117             e.printStackTrace();
    118         }
    119        
    120     }
    12157   
    122     //public Iterator<String> call(String json_file_in)
    123     public void call(String json_file_in)
     58    public Iterator<JSONObject> call(String json_file_in)
     59    //public void call(String json_file_in)
    12460    {
    12561        JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(_input_dir + "/" + json_file_in);
     
    14884        ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
    14985       
    150         ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
     86        ArrayList<JSONObject> json_pages = new ArrayList<JSONObject>(ef_num_pages);
    15187        for (int i = 0; i < ef_page_count; i++) {
    15288            String formatted_i = String.format("page-%06d", i);
     
    15894           
    15995            String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2";
    160             ids.add(output_json_bz2);
     96            //ids.add(output_json_bz2);
    16197           
    16298            if (i==0) {
     
    165101           
    166102            JSONObject ef_page = ef_pages.getJSONObject(i);
    167 
     103           
    168104            if (ef_page != null) {
    169105                // Convert to Solr add form
    170106                JSONObject solr_add_doc_json = JSONSolrTransform.generateSolrDocJSON(volume_id, page_id, ef_page);
    171 
     107                solr_add_doc_json.put("filename_json_bz2", output_json_bz2);
     108               
     109                json_pages.add(solr_add_doc_json);
    172110           
    173                 if ((_verbosity >=2) && (i==20)) {
    174                     System.out.println("==================");
    175                     System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
    176                     System.out.println("==================");
    177                 }
    178111               
    179                            
    180                 if (_solr_url != null) {
    181                     if ((_verbosity >=2) && (i==20)) {
    182                         System.out.println("==================");
    183                         System.out.println("Posting to: " + _solr_url);
    184                         System.out.println("==================");
    185                     }
    186                     postSolrDoc(_solr_url, solr_add_doc_json);
    187                 }
    188 
    189                 if (_output_dir != null) {
    190                     if ((_verbosity >=2) && (i==20)) {
    191                         System.out.println("==================");
    192                         System.out.println("Saving to: " + _output_dir);
    193                         System.out.println("==================");
    194                     }
    195                     saveSolrDoc(solr_add_doc_json, _output_dir + "/" + output_json_bz2);
    196                 }
    197112            }
    198113            else {
     
    203118       
    204119       
    205         ids.add(volume_id);
     120        //ids.add(volume_id);
    206121        _progress_accum.add(_progress_step);
    207122       
    208123        //return ids.iterator();
     124        return json_pages.iterator();
    209125    }
     126   
     127    public void call(JSONObject solr_add_doc_json)
     128    {
     129        String output_json_bz2 = solr_add_doc_json.getString("filename_json_bz2");
     130        solr_add_doc_json.remove("filename_json_bz2");
     131       
     132        boolean random_test = (Math.random()>0.999); // every 1000
     133       
     134        if ((_verbosity >=2) && (random_test)) {
     135            System.out.println("==================");
     136            System.out.println("Sample output Solr add JSON [random test 1/1000]: " + solr_add_doc_json.toString());
     137            System.out.println("==================");
     138        }
     139       
     140                   
     141        if (_solr_url != null) {
     142            if ((_verbosity >=2) && (random_test)) {
     143                System.out.println("==================");
     144                System.out.println("Posting to: " + _solr_url);
     145                System.out.println("==================");
     146            }
     147            JSONSolrTransform.postSolrDoc(_solr_url, solr_add_doc_json);
     148        }
     149
     150        if (_output_dir != null) {
     151            if ((_verbosity >=2) && (random_test)) {
     152                System.out.println("==================");
     153                System.out.println("Saving to: " + _output_dir);
     154                System.out.println("==================");
     155            }
     156            JSONSolrTransform.saveSolrDoc(solr_add_doc_json, _output_dir + "/" + output_json_bz2);
     157        }
     158    }
     159   
    210160}
    211161
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r30998 r31001  
    77import org.apache.spark.util.DoubleAccumulator;
    88import org.hathitrust.extractedfeatures.PagedJSON;
     9import org.json.JSONObject;
    910import org.apache.spark.SparkConf;
    1011
     
    4142    }
    4243
    43     public void exec()
     44    public void execPerVolume()
    4445    {   
    45         String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest";
     46        String spark_app_name = "[Per Volume] Extract Features: Process for Solr Ingest";
    4647        spark_app_name += " [" + _json_list_filename + "]";
    4748
     
    6162        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
    6263       
     64        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
     65
     66        json_list_data.foreach(per_vol_json);
     67       
     68        long num_ids = num_volumes;
     69       
     70        System.out.println("");
     71        System.out.println("############");
     72        System.out.println("# Number of volume ids: " + num_ids);
     73        System.out.println("############");
     74        System.out.println("");
     75
     76        jsc.close();
     77    }
     78
     79    public void execPerPage()
     80    {   
     81        String spark_app_name = "[Per Page] Extract Features: Process for Solr Ingest";
     82        spark_app_name += " [" + _json_list_filename + "]";
     83
     84        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     85        JavaSparkContext jsc = new JavaSparkContext(conf);
     86       
     87        if (_verbosity >= 2) {
     88            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
     89            System.out.println("Default Parallelism: " + jsc.defaultParallelism());
     90        }
     91               
     92        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
     93
     94        long num_volumes = json_list_data.count();
     95        double per_vol = 100.0/(double)num_volumes;
     96       
     97        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
     98       
    6399        PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
    64         //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
    65 
    66         json_list_data.foreach(paged_json);
    67 
    68    
     100        JavaRDD<JSONObject> json_per_page_ids = json_list_data.flatMap(paged_json).cache();
     101       
     102        json_per_page_ids.foreach(paged_json);
     103
    69104/*
    70105        System.out.println("");
     
    75110*/
    76111       
    77         //long num_ids = json_ids.count();
    78         long num_ids = num_volumes;
    79        
    80         System.out.println("");
    81         System.out.println("############");
    82         System.out.println("# Number of page ids: " + num_ids);
     112        long num_page_ids = json_per_page_ids.count();
     113       
     114        System.out.println("");
     115        System.out.println("############");
     116        System.out.println("# Number of page ids: " + num_page_ids);
    83117        System.out.println("############");
    84118        System.out.println("");
     
    99133    }
    100134
     135   
     136   
     137
    101138    public static void print_usage(HelpFormatter formatter, Options options)
    102139    {
     
    174211        ProcessForSolrIngest prep_for_ingest
    175212            = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
    176         prep_for_ingest.exec();
     213        prep_for_ingest.execPerVolume();
    177214    }
    178215}
Note: See TracChangeset for help on using the changeset viewer.