package org.hathitrust; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Iterator; import java.util.Set; import org.apache.commons.compress.compressors.CompressorException; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.util.DoubleAccumulator; import org.json.JSONArray; import org.json.JSONObject; /* class PagedJSON implements Function { private static final long serialVersionUID = 1L; public Boolean call(String s) { return s.contains("a"); } } */ //class PagedJSON implements FlatMapFunction class PagedJSON implements VoidFunction { private static final long serialVersionUID = 1L; protected String _input_dir; protected String _solr_url; protected String _output_dir; protected int _verbosity; DoubleAccumulator _progress_accum; double _progress_step; public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity, DoubleAccumulator progress_accum, double progress_step) { _input_dir = input_dir; _solr_url = solr_url; _output_dir = output_dir; _verbosity = verbosity; _progress_accum = progress_accum; _progress_step = progress_step; } protected JSONObject readJSONFile(String filename) { StringBuilder sb = new StringBuilder(); try { String str; BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename); while ((str = br.readLine()) != null) { sb.append(str); } br.close(); } catch (Exception e) { e.printStackTrace(); } JSONObject json_obj = new JSONObject(sb.toString()); return json_obj; } protected String generateSolrText(JSONObject ef_token_pos_count) { StringBuilder sb = new StringBuilder(); Iterator token_iter = ef_token_pos_count.keys(); while (token_iter.hasNext()) { String token = token_iter.next(); sb.append(token); if (token_iter.hasNext()) { sb.append(" "); } } /* Set token_keys = ef_token_pos_count.keySet(); for (String token : token_keys) { sb.append(token + " "); } */ return sb.toString(); } protected JSONObject generateSolrDocJSON(String volume_id, String page_id, JSONObject ef_page) { JSONObject solr_update_json = null; if (ef_page != null) { JSONObject ef_body = ef_page.getJSONObject("body"); if (ef_body != null) { JSONObject ef_token_pos_count = ef_body.getJSONObject("tokenPosCount"); if (ef_token_pos_count != null) { JSONObject solr_add_json = new JSONObject(); String text = generateSolrText(ef_token_pos_count); JSONObject solr_doc_json = new JSONObject(); solr_doc_json.put("id", page_id); solr_doc_json.put("volumeid_s", volume_id); solr_doc_json.put("_text_", text); solr_add_json.put("commitWithin", 5000); solr_add_json.put("doc", solr_doc_json); solr_update_json = new JSONObject(); solr_update_json.put("add",solr_add_json); } else { System.err.println("Warning: empty tokenPosCount field for '" + page_id + "'"); } } else { System.err.println("Warning: empty body field for '" + page_id + "'"); } } else { System.err.println("Warning: null page for '" + page_id + "'"); } /* /update/json/docs */ // For Reference ... // Example documentation on Solr JSON syntax: // https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers // #UploadingDatawithIndexHandlers-JSONFormattedIndexUpdates /* curl -X POST -H 'Content-Type: application/json' 'http://localhost:8983/solr/my_collection/update' --data-binary ' { "add": { "doc": { "id": "DOC1", "my_boosted_field": { use a map with boost/value for a boosted field "boost": 2.3, "value": "test" }, "my_multivalued_field": [ "aaa", "bbb" ] Can use an array for a multi-valued field } }, "add": { "commitWithin": 5000, commit this document within 5 seconds "overwrite": false, don't check for existing documents with the same uniqueKey "boost": 3.45, a document boost "doc": { "f1": "v1", Can use repeated keys for a multi-valued field "f1": "v2" } }, "commit": {}, "optimize": { "waitSearcher":false }, "delete": { "id":"ID" }, delete by ID "delete": { "query":"QUERY" } delete by query }' */ return solr_update_json; } protected void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2) { try { BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_file_json_bz2); bw.write(solr_add_doc_json.toString()); bw.close(); } catch (IOException e) { e.printStackTrace(); } catch (CompressorException e) { e.printStackTrace(); } } protected void postSolrDoc(JSONObject solr_add_doc_json) { String post_url = _solr_url; //String curl_popen = "curl -X POST -H 'Content-Type: application/json'"; //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'"; //curl_popen += " --data-binary '"; //curl_popen += "'" try { HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection())); httpcon.setDoOutput(true); httpcon.setRequestProperty("Content-Type", "application/json"); httpcon.setRequestProperty("Accept", "application/json"); httpcon.setRequestMethod("POST"); httpcon.connect(); byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8"); OutputStream os = httpcon.getOutputStream(); os.write(outputBytes); os.close(); // Read response StringBuilder sb = new StringBuilder(); BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream())); String decodedString; while ((decodedString = in.readLine()) != null) { sb.append(decodedString); } in.close(); JSONObject solr_status_json = new JSONObject(sb.toString()); JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader"); if (response_header_json != null) { int status = response_header_json.getInt("status"); if (status != 0) { System.err.println("Warning: POST request to " + post_url + " returned status " + status); System.err.println("Full response was: " + sb); } } else { System.err.println("Failed response to Solr POST: " + sb); } } catch (Exception e) { e.printStackTrace(); } } //public Iterator call(String json_file_in) public void call(String json_file_in) { JSONObject extracted_feature_record = readJSONFile(json_file_in); String volume_id = extracted_feature_record.getString("id"); //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); //String title= ef_metadata.getString("title"); JSONObject ef_features = extracted_feature_record.getJSONObject("features"); int ef_page_count = ef_features.getInt("pageCount"); if (_verbosity >= 1) { System.out.println("Processing: " + json_file_in); System.out.println(" pageCount = " + ef_page_count); } JSONArray ef_pages = ef_features.getJSONArray("pages"); int ef_num_pages = ef_pages.length(); // Make directory for page-level JSON output String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2"); String page_json_dir = json_dir + "/pages"; ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); ArrayList ids = new ArrayList(ef_num_pages); for (int i = 0; i < ef_page_count; i++) { String formatted_i = String.format("page-%06d", i); String page_id = volume_id + "." + formatted_i; if (_verbosity >= 2) { System.out.println(" Page: " + page_id); } String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; ids.add(output_json_bz2); if (i==0) { System.out.println("Sample output JSON page file: " + output_json_bz2); } JSONObject ef_page = ef_pages.getJSONObject(i); if (ef_page != null) { // Convert to Solr add form JSONObject solr_add_doc_json = generateSolrDocJSON(volume_id, page_id, ef_page); if (i==20) { System.out.println("=================="); System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString()); System.out.println("=================="); } if (_solr_url != null) { if (i==20) { System.out.println("=================="); System.out.println("Posting to: " + _solr_url); System.out.println("=================="); } postSolrDoc(solr_add_doc_json); } if (_output_dir != null) { if (i==20) { System.out.println("=================="); System.out.println("Saving to: " + _output_dir); System.out.println("=================="); } saveSolrDoc(solr_add_doc_json,output_json_bz2); } } else { System.err.println("Skipping: " + page_id); } } ids.add(volume_id); _progress_accum.add(_progress_step); //return ids.iterator(); } }