Changeset 31001 for other-projects/hathitrust
- Timestamp:
- 2016-10-30T23:51:07+13:00 (7 years ago)
- Location:
- other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/JSONSolrTransform.java
r30996 r31001 1 1 package org.hathitrust.extractedfeatures; 2 2 3 import java.io.BufferedReader; 4 import java.io.BufferedWriter; 5 import java.io.IOException; 6 import java.io.InputStreamReader; 7 import java.io.OutputStream; 8 import java.net.HttpURLConnection; 9 import java.net.URL; 3 10 import java.util.Iterator; 4 11 12 import org.apache.commons.compress.compressors.CompressorException; 5 13 import org.json.JSONObject; 6 14 … … 115 123 } 116 124 125 public static void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2) 126 { 127 try { 128 BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(output_file_json_bz2); 129 bw.write(solr_add_doc_json.toString()); 130 bw.close(); 131 } catch (IOException e) { 132 e.printStackTrace(); 133 } catch (CompressorException e) { 134 e.printStackTrace(); 135 } 136 } 137 138 public static void postSolrDoc(String post_url, JSONObject solr_add_doc_json) 139 { 140 141 //String curl_popen = "curl -X POST -H 'Content-Type: application/json'"; 142 //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'"; 143 //curl_popen += " --data-binary '"; 144 //curl_popen += "'" 145 146 147 try { 148 HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection())); 149 httpcon.setDoOutput(true); 150 httpcon.setRequestProperty("Content-Type", "application/json"); 151 httpcon.setRequestProperty("Accept", "application/json"); 152 httpcon.setRequestMethod("POST"); 153 httpcon.connect(); 154 155 byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8"); 156 OutputStream os = httpcon.getOutputStream(); 157 os.write(outputBytes); 158 os.close(); 159 160 161 // Read response 162 StringBuilder sb = new StringBuilder(); 163 BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream())); 164 String decodedString; 165 while ((decodedString = in.readLine()) != null) { 166 sb.append(decodedString); 167 } 168 in.close(); 169 170 JSONObject solr_status_json = new JSONObject(sb.toString()); 171 JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader"); 172 if (response_header_json != null) { 173 int status = response_header_json.getInt("status"); 174 if (status != 0) { 175 System.err.println("Warning: POST request to " + post_url + " returned status " + status); 176 System.err.println("Full response was: " + sb); 177 } 178 } 179 else { 180 System.err.println("Failed response to Solr POST: " + sb); 181 } 182 183 184 185 } 186 catch (Exception e) { 187 e.printStackTrace(); 188 } 189 190 } 117 191 } -
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/PagedJSON.java
r30997 r31001 9 9 import java.net.URL; 10 10 import java.util.ArrayList; 11 import java.util.Iterator; 11 12 import java.util.Set; 12 13 … … 28 29 29 30 30 //class PagedJSON implements FlatMapFunction<String, String>31 public class PagedJSON implements VoidFunction<String>31 class PagedJSON implements FlatMapFunction<String, JSONObject>, VoidFunction<JSONObject> 32 //public class PagedJSON implements VoidFunction<String> 32 33 { 33 34 private static final long serialVersionUID = 1L; … … 53 54 } 54 55 55 public static void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2)56 {57 try {58 BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(output_file_json_bz2);59 bw.write(solr_add_doc_json.toString());60 bw.close();61 } catch (IOException e) {62 e.printStackTrace();63 } catch (CompressorException e) {64 e.printStackTrace();65 }66 }67 56 68 public static void postSolrDoc(String post_url, JSONObject solr_add_doc_json)69 {70 71 //String curl_popen = "curl -X POST -H 'Content-Type: application/json'";72 //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'";73 //curl_popen += " --data-binary '";74 //curl_popen += "'"75 76 77 try {78 HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection()));79 httpcon.setDoOutput(true);80 httpcon.setRequestProperty("Content-Type", "application/json");81 httpcon.setRequestProperty("Accept", "application/json");82 httpcon.setRequestMethod("POST");83 httpcon.connect();84 85 byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8");86 OutputStream os = httpcon.getOutputStream();87 os.write(outputBytes);88 os.close();89 90 91 // Read response92 StringBuilder sb = new StringBuilder();93 BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream()));94 String decodedString;95 while ((decodedString = in.readLine()) != null) {96 sb.append(decodedString);97 }98 in.close();99 100 JSONObject solr_status_json = new JSONObject(sb.toString());101 JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader");102 if (response_header_json != null) {103 int status = response_header_json.getInt("status");104 if (status != 0) {105 System.err.println("Warning: POST request to " + post_url + " returned status " + status);106 System.err.println("Full response was: " + sb);107 }108 }109 else {110 System.err.println("Failed response to Solr POST: " + sb);111 }112 113 114 115 }116 catch (Exception e) {117 e.printStackTrace();118 }119 120 }121 57 122 //public Iterator<String> call(String json_file_in)123 public void call(String json_file_in)58 public Iterator<JSONObject> call(String json_file_in) 59 //public void call(String json_file_in) 124 60 { 125 61 JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(_input_dir + "/" + json_file_in); … … 148 84 ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 149 85 150 ArrayList< String> ids = new ArrayList<String>(ef_num_pages);86 ArrayList<JSONObject> json_pages = new ArrayList<JSONObject>(ef_num_pages); 151 87 for (int i = 0; i < ef_page_count; i++) { 152 88 String formatted_i = String.format("page-%06d", i); … … 158 94 159 95 String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; 160 ids.add(output_json_bz2);96 //ids.add(output_json_bz2); 161 97 162 98 if (i==0) { … … 165 101 166 102 JSONObject ef_page = ef_pages.getJSONObject(i); 167 103 168 104 if (ef_page != null) { 169 105 // Convert to Solr add form 170 106 JSONObject solr_add_doc_json = JSONSolrTransform.generateSolrDocJSON(volume_id, page_id, ef_page); 171 107 solr_add_doc_json.put("filename_json_bz2", output_json_bz2); 108 109 json_pages.add(solr_add_doc_json); 172 110 173 if ((_verbosity >=2) && (i==20)) {174 System.out.println("==================");175 System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());176 System.out.println("==================");177 }178 111 179 180 if (_solr_url != null) {181 if ((_verbosity >=2) && (i==20)) {182 System.out.println("==================");183 System.out.println("Posting to: " + _solr_url);184 System.out.println("==================");185 }186 postSolrDoc(_solr_url, solr_add_doc_json);187 }188 189 if (_output_dir != null) {190 if ((_verbosity >=2) && (i==20)) {191 System.out.println("==================");192 System.out.println("Saving to: " + _output_dir);193 System.out.println("==================");194 }195 saveSolrDoc(solr_add_doc_json, _output_dir + "/" + output_json_bz2);196 }197 112 } 198 113 else { … … 203 118 204 119 205 ids.add(volume_id);120 //ids.add(volume_id); 206 121 _progress_accum.add(_progress_step); 207 122 208 123 //return ids.iterator(); 124 return json_pages.iterator(); 209 125 } 126 127 public void call(JSONObject solr_add_doc_json) 128 { 129 String output_json_bz2 = solr_add_doc_json.getString("filename_json_bz2"); 130 solr_add_doc_json.remove("filename_json_bz2"); 131 132 boolean random_test = (Math.random()>0.999); // every 1000 133 134 if ((_verbosity >=2) && (random_test)) { 135 System.out.println("=================="); 136 System.out.println("Sample output Solr add JSON [random test 1/1000]: " + solr_add_doc_json.toString()); 137 System.out.println("=================="); 138 } 139 140 141 if (_solr_url != null) { 142 if ((_verbosity >=2) && (random_test)) { 143 System.out.println("=================="); 144 System.out.println("Posting to: " + _solr_url); 145 System.out.println("=================="); 146 } 147 JSONSolrTransform.postSolrDoc(_solr_url, solr_add_doc_json); 148 } 149 150 if (_output_dir != null) { 151 if ((_verbosity >=2) && (random_test)) { 152 System.out.println("=================="); 153 System.out.println("Saving to: " + _output_dir); 154 System.out.println("=================="); 155 } 156 JSONSolrTransform.saveSolrDoc(solr_add_doc_json, _output_dir + "/" + output_json_bz2); 157 } 158 } 159 210 160 } 211 161 -
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
r30998 r31001 7 7 import org.apache.spark.util.DoubleAccumulator; 8 8 import org.hathitrust.extractedfeatures.PagedJSON; 9 import org.json.JSONObject; 9 10 import org.apache.spark.SparkConf; 10 11 … … 41 42 } 42 43 43 public void exec ()44 public void execPerVolume() 44 45 { 45 String spark_app_name = " HathiTrust Extract Features: Preparefor Solr Ingest";46 String spark_app_name = "[Per Volume] Extract Features: Process for Solr Ingest"; 46 47 spark_app_name += " [" + _json_list_filename + "]"; 47 48 … … 61 62 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 62 63 64 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 65 66 json_list_data.foreach(per_vol_json); 67 68 long num_ids = num_volumes; 69 70 System.out.println(""); 71 System.out.println("############"); 72 System.out.println("# Number of volume ids: " + num_ids); 73 System.out.println("############"); 74 System.out.println(""); 75 76 jsc.close(); 77 } 78 79 public void execPerPage() 80 { 81 String spark_app_name = "[Per Page] Extract Features: Process for Solr Ingest"; 82 spark_app_name += " [" + _json_list_filename + "]"; 83 84 SparkConf conf = new SparkConf().setAppName(spark_app_name); 85 JavaSparkContext jsc = new JavaSparkContext(conf); 86 87 if (_verbosity >= 2) { 88 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 89 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 90 } 91 92 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 93 94 long num_volumes = json_list_data.count(); 95 double per_vol = 100.0/(double)num_volumes; 96 97 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 98 63 99 PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 64 //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 65 66 json_list_data.foreach(paged_json); 67 68 100 JavaRDD<JSONObject> json_per_page_ids = json_list_data.flatMap(paged_json).cache(); 101 102 json_per_page_ids.foreach(paged_json); 103 69 104 /* 70 105 System.out.println(""); … … 75 110 */ 76 111 77 //long num_ids = json_ids.count(); 78 long num_ids = num_volumes; 79 80 System.out.println(""); 81 System.out.println("############"); 82 System.out.println("# Number of page ids: " + num_ids); 112 long num_page_ids = json_per_page_ids.count(); 113 114 System.out.println(""); 115 System.out.println("############"); 116 System.out.println("# Number of page ids: " + num_page_ids); 83 117 System.out.println("############"); 84 118 System.out.println(""); … … 99 133 } 100 134 135 136 137 101 138 public static void print_usage(HelpFormatter formatter, Options options) 102 139 { … … 174 211 ProcessForSolrIngest prep_for_ingest 175 212 = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity); 176 prep_for_ingest.exec ();213 prep_for_ingest.execPerVolume(); 177 214 } 178 215 }
Note:
See TracChangeset
for help on using the changeset viewer.