[30898] | 1 | package org.hathitrust;
|
---|
| 2 |
|
---|
| 3 | import java.io.BufferedReader;
|
---|
[30951] | 4 | import java.io.BufferedWriter;
|
---|
| 5 | import java.io.IOException;
|
---|
[30980] | 6 | import java.io.InputStreamReader;
|
---|
[30970] | 7 | import java.io.OutputStream;
|
---|
| 8 | import java.net.HttpURLConnection;
|
---|
| 9 | import java.net.URL;
|
---|
[30898] | 10 | import java.util.ArrayList;
|
---|
| 11 | import java.util.Iterator;
|
---|
[30970] | 12 | import java.util.Set;
|
---|
[30898] | 13 |
|
---|
[30951] | 14 | import org.apache.commons.compress.compressors.CompressorException;
|
---|
[30898] | 15 | import org.apache.spark.api.java.function.FlatMapFunction;
|
---|
[30985] | 16 | import org.apache.spark.api.java.function.VoidFunction;
|
---|
[30984] | 17 | import org.apache.spark.util.DoubleAccumulator;
|
---|
[30898] | 18 | import org.json.JSONArray;
|
---|
| 19 | import org.json.JSONObject;
|
---|
| 20 |
|
---|
| 21 | /*
|
---|
| 22 | class PagedJSON implements Function<String, Boolean> {
|
---|
| 23 |
|
---|
| 24 | private static final long serialVersionUID = 1L;
|
---|
| 25 |
|
---|
| 26 | public Boolean call(String s) { return s.contains("a"); }
|
---|
| 27 | }
|
---|
| 28 | */
|
---|
| 29 |
|
---|
[30918] | 30 |
|
---|
[30985] | 31 | //class PagedJSON implements FlatMapFunction<String, String>
|
---|
| 32 | class PagedJSON implements VoidFunction<String>
|
---|
[30898] | 33 | {
|
---|
| 34 | private static final long serialVersionUID = 1L;
|
---|
| 35 |
|
---|
| 36 | protected String _input_dir;
|
---|
[30975] | 37 | protected String _solr_url;
|
---|
[30945] | 38 | protected String _output_dir;
|
---|
[30942] | 39 | protected int _verbosity;
|
---|
[30898] | 40 |
|
---|
[30984] | 41 | DoubleAccumulator _progress_accum;
|
---|
| 42 | double _progress_step;
|
---|
| 43 |
|
---|
| 44 | public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity,
|
---|
| 45 | DoubleAccumulator progress_accum, double progress_step)
|
---|
[30898] | 46 | {
|
---|
[30945] | 47 | _input_dir = input_dir;
|
---|
[30975] | 48 | _solr_url = solr_url;
|
---|
[30945] | 49 | _output_dir = output_dir;
|
---|
| 50 | _verbosity = verbosity;
|
---|
[30984] | 51 |
|
---|
| 52 | _progress_accum = progress_accum;
|
---|
| 53 | _progress_step = progress_step;
|
---|
[30898] | 54 | }
|
---|
| 55 |
|
---|
| 56 | protected JSONObject readJSONFile(String filename)
|
---|
| 57 | {
|
---|
| 58 | StringBuilder sb = new StringBuilder();
|
---|
| 59 |
|
---|
| 60 | try {
|
---|
| 61 |
|
---|
| 62 | String str;
|
---|
[30937] | 63 | BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
|
---|
[30898] | 64 | while ((str = br.readLine()) != null) {
|
---|
| 65 | sb.append(str);
|
---|
| 66 | }
|
---|
| 67 |
|
---|
| 68 | br.close();
|
---|
| 69 | }
|
---|
| 70 | catch (Exception e) {
|
---|
| 71 | e.printStackTrace();
|
---|
| 72 | }
|
---|
| 73 |
|
---|
| 74 | JSONObject json_obj = new JSONObject(sb.toString());
|
---|
| 75 |
|
---|
[30970] | 76 |
|
---|
[30898] | 77 | return json_obj;
|
---|
| 78 | }
|
---|
| 79 |
|
---|
[30970] | 80 | protected String generateSolrText(JSONObject ef_token_pos_count)
|
---|
| 81 | {
|
---|
| 82 | StringBuilder sb = new StringBuilder();
|
---|
| 83 |
|
---|
| 84 | Iterator<String> token_iter = ef_token_pos_count.keys();
|
---|
| 85 | while (token_iter.hasNext()) {
|
---|
| 86 | String token = token_iter.next();
|
---|
| 87 |
|
---|
| 88 | sb.append(token);
|
---|
| 89 | if (token_iter.hasNext()) {
|
---|
| 90 | sb.append(" ");
|
---|
| 91 | }
|
---|
| 92 | }
|
---|
| 93 |
|
---|
| 94 | /*
|
---|
| 95 | Set<String> token_keys = ef_token_pos_count.keySet();
|
---|
| 96 | for (String token : token_keys) {
|
---|
| 97 | sb.append(token + " ");
|
---|
| 98 | }
|
---|
| 99 | */
|
---|
| 100 |
|
---|
| 101 | return sb.toString();
|
---|
| 102 | }
|
---|
| 103 |
|
---|
| 104 | protected JSONObject generateSolrDocJSON(String volume_id, String page_id, JSONObject ef_page)
|
---|
| 105 | {
|
---|
[30974] | 106 | JSONObject solr_update_json = null;
|
---|
[30970] | 107 |
|
---|
| 108 | if (ef_page != null) {
|
---|
| 109 | JSONObject ef_body = ef_page.getJSONObject("body");
|
---|
| 110 | if (ef_body != null) {
|
---|
| 111 | JSONObject ef_token_pos_count = ef_body.getJSONObject("tokenPosCount");
|
---|
| 112 | if (ef_token_pos_count != null) {
|
---|
| 113 |
|
---|
[30974] | 114 | JSONObject solr_add_json = new JSONObject();
|
---|
[30970] | 115 |
|
---|
| 116 | String text = generateSolrText(ef_token_pos_count);
|
---|
| 117 |
|
---|
| 118 | JSONObject solr_doc_json = new JSONObject();
|
---|
| 119 | solr_doc_json.put("id", page_id);
|
---|
[30971] | 120 | solr_doc_json.put("volumeid_s", volume_id);
|
---|
| 121 | solr_doc_json.put("_text_", text);
|
---|
[30970] | 122 |
|
---|
[30974] | 123 | solr_add_json.put("commitWithin", 5000);
|
---|
| 124 | solr_add_json.put("doc", solr_doc_json);
|
---|
| 125 |
|
---|
| 126 | solr_update_json = new JSONObject();
|
---|
| 127 | solr_update_json.put("add",solr_add_json);
|
---|
| 128 |
|
---|
[30970] | 129 | }
|
---|
| 130 | else {
|
---|
| 131 | System.err.println("Warning: empty tokenPosCount field for '" + page_id + "'");
|
---|
| 132 | }
|
---|
| 133 | }
|
---|
| 134 | else {
|
---|
| 135 | System.err.println("Warning: empty body field for '" + page_id + "'");
|
---|
| 136 | }
|
---|
| 137 |
|
---|
| 138 | }
|
---|
| 139 | else {
|
---|
| 140 | System.err.println("Warning: null page for '" + page_id + "'");
|
---|
| 141 | }
|
---|
| 142 |
|
---|
| 143 |
|
---|
| 144 | /*
|
---|
| 145 |
|
---|
| 146 | /update/json/docs
|
---|
| 147 | */
|
---|
| 148 |
|
---|
| 149 | // For Reference ...
|
---|
| 150 | // Example documentation on Solr JSON syntax:
|
---|
| 151 | // https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers
|
---|
| 152 | // #UploadingDatawithIndexHandlers-JSONFormattedIndexUpdates
|
---|
| 153 |
|
---|
| 154 | /*
|
---|
| 155 | curl -X POST -H 'Content-Type: application/json' 'http://localhost:8983/solr/my_collection/update' --data-binary '
|
---|
| 156 | {
|
---|
| 157 | "add": {
|
---|
| 158 | "doc": {
|
---|
| 159 | "id": "DOC1",
|
---|
| 160 | "my_boosted_field": { use a map with boost/value for a boosted field
|
---|
| 161 | "boost": 2.3,
|
---|
| 162 | "value": "test"
|
---|
| 163 | },
|
---|
| 164 | "my_multivalued_field": [ "aaa", "bbb" ] Can use an array for a multi-valued field
|
---|
| 165 | }
|
---|
| 166 | },
|
---|
| 167 | "add": {
|
---|
| 168 | "commitWithin": 5000, commit this document within 5 seconds
|
---|
| 169 | "overwrite": false, don't check for existing documents with the same uniqueKey
|
---|
| 170 | "boost": 3.45, a document boost
|
---|
| 171 | "doc": {
|
---|
| 172 | "f1": "v1", Can use repeated keys for a multi-valued field
|
---|
| 173 | "f1": "v2"
|
---|
| 174 | }
|
---|
| 175 | },
|
---|
| 176 |
|
---|
| 177 | "commit": {},
|
---|
| 178 | "optimize": { "waitSearcher":false },
|
---|
| 179 |
|
---|
| 180 | "delete": { "id":"ID" }, delete by ID
|
---|
| 181 | "delete": { "query":"QUERY" } delete by query
|
---|
| 182 | }'
|
---|
| 183 | */
|
---|
| 184 |
|
---|
[30974] | 185 | return solr_update_json;
|
---|
[30970] | 186 | }
|
---|
| 187 |
|
---|
[30973] | 188 | protected void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2)
|
---|
| 189 | {
|
---|
| 190 | try {
|
---|
| 191 | BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_file_json_bz2);
|
---|
| 192 | bw.write(solr_add_doc_json.toString());
|
---|
| 193 | bw.close();
|
---|
| 194 | } catch (IOException e) {
|
---|
| 195 | e.printStackTrace();
|
---|
| 196 | } catch (CompressorException e) {
|
---|
| 197 | e.printStackTrace();
|
---|
| 198 | }
|
---|
| 199 | }
|
---|
| 200 |
|
---|
[30970] | 201 | protected void postSolrDoc(JSONObject solr_add_doc_json)
|
---|
| 202 | {
|
---|
[30979] | 203 | String post_url = _solr_url;
|
---|
| 204 |
|
---|
[30970] | 205 | //String curl_popen = "curl -X POST -H 'Content-Type: application/json'";
|
---|
| 206 | //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'";
|
---|
| 207 | //curl_popen += " --data-binary '";
|
---|
| 208 | //curl_popen += "'"
|
---|
| 209 |
|
---|
| 210 |
|
---|
| 211 | try {
|
---|
| 212 | HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection()));
|
---|
| 213 | httpcon.setDoOutput(true);
|
---|
| 214 | httpcon.setRequestProperty("Content-Type", "application/json");
|
---|
| 215 | httpcon.setRequestProperty("Accept", "application/json");
|
---|
| 216 | httpcon.setRequestMethod("POST");
|
---|
| 217 | httpcon.connect();
|
---|
| 218 |
|
---|
| 219 | byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8");
|
---|
| 220 | OutputStream os = httpcon.getOutputStream();
|
---|
| 221 | os.write(outputBytes);
|
---|
| 222 | os.close();
|
---|
[30980] | 223 |
|
---|
| 224 |
|
---|
| 225 | // Read response
|
---|
[30984] | 226 | StringBuilder sb = new StringBuilder();
|
---|
| 227 | BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream()));
|
---|
[30980] | 228 | String decodedString;
|
---|
| 229 | while ((decodedString = in.readLine()) != null) {
|
---|
[30984] | 230 | sb.append(decodedString);
|
---|
[30980] | 231 | }
|
---|
| 232 | in.close();
|
---|
| 233 |
|
---|
[30984] | 234 | JSONObject solr_status_json = new JSONObject(sb.toString());
|
---|
| 235 | JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader");
|
---|
| 236 | if (response_header_json != null) {
|
---|
| 237 | int status = response_header_json.getInt("status");
|
---|
| 238 | if (status != 0) {
|
---|
| 239 | System.err.println("Warning: POST request to " + post_url + " returned status " + status);
|
---|
| 240 | System.err.println("Full response was: " + sb);
|
---|
| 241 | }
|
---|
| 242 | }
|
---|
| 243 | else {
|
---|
| 244 | System.err.println("Failed response to Solr POST: " + sb);
|
---|
| 245 | }
|
---|
| 246 |
|
---|
| 247 |
|
---|
| 248 |
|
---|
[30970] | 249 | }
|
---|
| 250 | catch (Exception e) {
|
---|
| 251 | e.printStackTrace();
|
---|
| 252 | }
|
---|
| 253 |
|
---|
| 254 | }
|
---|
[30985] | 255 |
|
---|
| 256 | //public Iterator<String> call(String json_file_in)
|
---|
| 257 | public void call(String json_file_in)
|
---|
[30898] | 258 | {
|
---|
[30937] | 259 | JSONObject extracted_feature_record = readJSONFile(json_file_in);
|
---|
[30898] | 260 |
|
---|
[30970] | 261 | String volume_id = extracted_feature_record.getString("id");
|
---|
[30898] | 262 |
|
---|
[30945] | 263 | //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
|
---|
[30970] | 264 | //String title= ef_metadata.getString("title");
|
---|
| 265 |
|
---|
[30898] | 266 | JSONObject ef_features = extracted_feature_record.getJSONObject("features");
|
---|
| 267 |
|
---|
| 268 |
|
---|
| 269 | int ef_page_count = ef_features.getInt("pageCount");
|
---|
| 270 |
|
---|
[30942] | 271 | if (_verbosity >= 1) {
|
---|
| 272 | System.out.println("Processing: " + json_file_in);
|
---|
| 273 | System.out.println(" pageCount = " + ef_page_count);
|
---|
| 274 | }
|
---|
| 275 |
|
---|
[30898] | 276 | JSONArray ef_pages = ef_features.getJSONArray("pages");
|
---|
| 277 | int ef_num_pages = ef_pages.length();
|
---|
| 278 |
|
---|
[30945] | 279 | // Make directory for page-level JSON output
|
---|
| 280 | String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2");
|
---|
| 281 | String page_json_dir = json_dir + "/pages";
|
---|
[30951] | 282 | ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
|
---|
[30945] | 283 |
|
---|
[30898] | 284 | ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
|
---|
| 285 | for (int i = 0; i < ef_page_count; i++) {
|
---|
[30945] | 286 | String formatted_i = String.format("page-%06d", i);
|
---|
[30970] | 287 | String page_id = volume_id + "." + formatted_i;
|
---|
[30945] | 288 |
|
---|
| 289 | if (_verbosity >= 2) {
|
---|
| 290 | System.out.println(" Page: " + page_id);
|
---|
| 291 | }
|
---|
| 292 |
|
---|
[30949] | 293 | String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2";
|
---|
[30946] | 294 | ids.add(output_json_bz2);
|
---|
| 295 |
|
---|
[30945] | 296 | if (i==0) {
|
---|
[30946] | 297 | System.out.println("Sample output JSON page file: " + output_json_bz2);
|
---|
[30945] | 298 | }
|
---|
[30951] | 299 |
|
---|
| 300 | JSONObject ef_page = ef_pages.getJSONObject(i);
|
---|
[30970] | 301 |
|
---|
| 302 | if (ef_page != null) {
|
---|
| 303 | // Convert to Solr add form
|
---|
| 304 | JSONObject solr_add_doc_json = generateSolrDocJSON(volume_id, page_id, ef_page);
|
---|
| 305 |
|
---|
| 306 | if (i==20) {
|
---|
[30980] | 307 | System.out.println("==================");
|
---|
[30970] | 308 | System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
|
---|
| 309 | System.out.println("==================");
|
---|
| 310 | }
|
---|
| 311 |
|
---|
[30973] | 312 |
|
---|
[30975] | 313 | if (_solr_url != null) {
|
---|
[30978] | 314 | if (i==20) {
|
---|
[30980] | 315 | System.out.println("==================");
|
---|
[30978] | 316 | System.out.println("Posting to: " + _solr_url);
|
---|
[30980] | 317 | System.out.println("==================");
|
---|
[30978] | 318 | }
|
---|
[30975] | 319 | postSolrDoc(solr_add_doc_json);
|
---|
| 320 | }
|
---|
| 321 |
|
---|
| 322 | if (_output_dir != null) {
|
---|
[30978] | 323 | if (i==20) {
|
---|
[30980] | 324 | System.out.println("==================");
|
---|
[30978] | 325 | System.out.println("Saving to: " + _output_dir);
|
---|
[30980] | 326 | System.out.println("==================");
|
---|
[30978] | 327 | }
|
---|
[30975] | 328 | saveSolrDoc(solr_add_doc_json,output_json_bz2);
|
---|
| 329 | }
|
---|
[30951] | 330 | }
|
---|
[30970] | 331 | else {
|
---|
| 332 | System.err.println("Skipping: " + page_id);
|
---|
| 333 | }
|
---|
[30951] | 334 |
|
---|
[30898] | 335 | }
|
---|
| 336 |
|
---|
| 337 |
|
---|
[30970] | 338 | ids.add(volume_id);
|
---|
[30984] | 339 | _progress_accum.add(_progress_step);
|
---|
[30985] | 340 |
|
---|
| 341 | //return ids.iterator();
|
---|
[30898] | 342 | }
|
---|
| 343 | }
|
---|
| 344 |
|
---|