[30898] | 1 | package org.hathitrust;
|
---|
| 2 |
|
---|
| 3 | import java.io.BufferedReader;
|
---|
[30951] | 4 | import java.io.BufferedWriter;
|
---|
| 5 | import java.io.IOException;
|
---|
[30970] | 6 | import java.io.OutputStream;
|
---|
| 7 | import java.net.HttpURLConnection;
|
---|
| 8 | import java.net.URL;
|
---|
[30898] | 9 | import java.util.ArrayList;
|
---|
| 10 | import java.util.Iterator;
|
---|
[30970] | 11 | import java.util.Set;
|
---|
[30898] | 12 |
|
---|
[30951] | 13 | import org.apache.commons.compress.compressors.CompressorException;
|
---|
[30898] | 14 | import org.apache.spark.api.java.function.FlatMapFunction;
|
---|
| 15 | import org.json.JSONArray;
|
---|
| 16 | import org.json.JSONObject;
|
---|
| 17 |
|
---|
| 18 | /*
|
---|
| 19 | class PagedJSON implements Function<String, Boolean> {
|
---|
| 20 |
|
---|
| 21 | private static final long serialVersionUID = 1L;
|
---|
| 22 |
|
---|
| 23 | public Boolean call(String s) { return s.contains("a"); }
|
---|
| 24 | }
|
---|
| 25 | */
|
---|
| 26 |
|
---|
[30918] | 27 |
|
---|
[30898] | 28 | class PagedJSON implements FlatMapFunction<String, String>
|
---|
| 29 | {
|
---|
| 30 | private static final long serialVersionUID = 1L;
|
---|
| 31 |
|
---|
| 32 | protected String _input_dir;
|
---|
[30945] | 33 | protected String _output_dir;
|
---|
[30942] | 34 | protected int _verbosity;
|
---|
[30898] | 35 |
|
---|
[30945] | 36 | public PagedJSON(String input_dir, String output_dir, int verbosity)
|
---|
[30898] | 37 | {
|
---|
[30945] | 38 | _input_dir = input_dir;
|
---|
| 39 | _output_dir = output_dir;
|
---|
| 40 | _verbosity = verbosity;
|
---|
[30898] | 41 | }
|
---|
| 42 |
|
---|
| 43 | protected JSONObject readJSONFile(String filename)
|
---|
| 44 | {
|
---|
| 45 | StringBuilder sb = new StringBuilder();
|
---|
| 46 |
|
---|
| 47 | try {
|
---|
| 48 |
|
---|
| 49 | String str;
|
---|
[30937] | 50 | BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
|
---|
[30898] | 51 | while ((str = br.readLine()) != null) {
|
---|
| 52 | sb.append(str);
|
---|
| 53 | }
|
---|
| 54 |
|
---|
| 55 | br.close();
|
---|
| 56 | }
|
---|
| 57 | catch (Exception e) {
|
---|
| 58 | e.printStackTrace();
|
---|
| 59 | }
|
---|
| 60 |
|
---|
| 61 | JSONObject json_obj = new JSONObject(sb.toString());
|
---|
| 62 |
|
---|
[30970] | 63 |
|
---|
[30898] | 64 | return json_obj;
|
---|
| 65 | }
|
---|
| 66 |
|
---|
[30970] | 67 | protected String generateSolrText(JSONObject ef_token_pos_count)
|
---|
| 68 | {
|
---|
| 69 | StringBuilder sb = new StringBuilder();
|
---|
| 70 |
|
---|
| 71 | Iterator<String> token_iter = ef_token_pos_count.keys();
|
---|
| 72 | while (token_iter.hasNext()) {
|
---|
| 73 | String token = token_iter.next();
|
---|
| 74 |
|
---|
| 75 | sb.append(token);
|
---|
| 76 | if (token_iter.hasNext()) {
|
---|
| 77 | sb.append(" ");
|
---|
| 78 | }
|
---|
| 79 | }
|
---|
| 80 |
|
---|
| 81 | /*
|
---|
| 82 | Set<String> token_keys = ef_token_pos_count.keySet();
|
---|
| 83 | for (String token : token_keys) {
|
---|
| 84 | sb.append(token + " ");
|
---|
| 85 | }
|
---|
| 86 | */
|
---|
| 87 |
|
---|
| 88 | return sb.toString();
|
---|
| 89 | }
|
---|
| 90 |
|
---|
| 91 | protected JSONObject generateSolrDocJSON(String volume_id, String page_id, JSONObject ef_page)
|
---|
| 92 | {
|
---|
[30974] | 93 | JSONObject solr_update_json = null;
|
---|
[30970] | 94 |
|
---|
| 95 | if (ef_page != null) {
|
---|
| 96 | JSONObject ef_body = ef_page.getJSONObject("body");
|
---|
| 97 | if (ef_body != null) {
|
---|
| 98 | JSONObject ef_token_pos_count = ef_body.getJSONObject("tokenPosCount");
|
---|
| 99 | if (ef_token_pos_count != null) {
|
---|
| 100 |
|
---|
[30974] | 101 | JSONObject solr_add_json = new JSONObject();
|
---|
[30970] | 102 |
|
---|
| 103 | String text = generateSolrText(ef_token_pos_count);
|
---|
| 104 |
|
---|
| 105 | JSONObject solr_doc_json = new JSONObject();
|
---|
| 106 | solr_doc_json.put("id", page_id);
|
---|
[30971] | 107 | solr_doc_json.put("volumeid_s", volume_id);
|
---|
| 108 | solr_doc_json.put("_text_", text);
|
---|
[30970] | 109 |
|
---|
[30974] | 110 | solr_add_json.put("commitWithin", 5000);
|
---|
| 111 | solr_add_json.put("doc", solr_doc_json);
|
---|
| 112 |
|
---|
| 113 | solr_update_json = new JSONObject();
|
---|
| 114 | solr_update_json.put("add",solr_add_json);
|
---|
| 115 |
|
---|
[30970] | 116 | }
|
---|
| 117 | else {
|
---|
| 118 | System.err.println("Warning: empty tokenPosCount field for '" + page_id + "'");
|
---|
| 119 | }
|
---|
| 120 | }
|
---|
| 121 | else {
|
---|
| 122 | System.err.println("Warning: empty body field for '" + page_id + "'");
|
---|
| 123 | }
|
---|
| 124 |
|
---|
| 125 | }
|
---|
| 126 | else {
|
---|
| 127 | System.err.println("Warning: null page for '" + page_id + "'");
|
---|
| 128 | }
|
---|
| 129 |
|
---|
| 130 |
|
---|
| 131 | /*
|
---|
| 132 |
|
---|
| 133 | /update/json/docs
|
---|
| 134 | */
|
---|
| 135 |
|
---|
| 136 | // For Reference ...
|
---|
| 137 | // Example documentation on Solr JSON syntax:
|
---|
| 138 | // https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers
|
---|
| 139 | // #UploadingDatawithIndexHandlers-JSONFormattedIndexUpdates
|
---|
| 140 |
|
---|
| 141 | /*
|
---|
| 142 | curl -X POST -H 'Content-Type: application/json' 'http://localhost:8983/solr/my_collection/update' --data-binary '
|
---|
| 143 | {
|
---|
| 144 | "add": {
|
---|
| 145 | "doc": {
|
---|
| 146 | "id": "DOC1",
|
---|
| 147 | "my_boosted_field": { use a map with boost/value for a boosted field
|
---|
| 148 | "boost": 2.3,
|
---|
| 149 | "value": "test"
|
---|
| 150 | },
|
---|
| 151 | "my_multivalued_field": [ "aaa", "bbb" ] Can use an array for a multi-valued field
|
---|
| 152 | }
|
---|
| 153 | },
|
---|
| 154 | "add": {
|
---|
| 155 | "commitWithin": 5000, commit this document within 5 seconds
|
---|
| 156 | "overwrite": false, don't check for existing documents with the same uniqueKey
|
---|
| 157 | "boost": 3.45, a document boost
|
---|
| 158 | "doc": {
|
---|
| 159 | "f1": "v1", Can use repeated keys for a multi-valued field
|
---|
| 160 | "f1": "v2"
|
---|
| 161 | }
|
---|
| 162 | },
|
---|
| 163 |
|
---|
| 164 | "commit": {},
|
---|
| 165 | "optimize": { "waitSearcher":false },
|
---|
| 166 |
|
---|
| 167 | "delete": { "id":"ID" }, delete by ID
|
---|
| 168 | "delete": { "query":"QUERY" } delete by query
|
---|
| 169 | }'
|
---|
| 170 | */
|
---|
| 171 |
|
---|
| 172 | //return solr_doc_json;
|
---|
[30974] | 173 | return solr_update_json;
|
---|
[30970] | 174 | }
|
---|
| 175 |
|
---|
[30973] | 176 | protected void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2)
|
---|
| 177 | {
|
---|
| 178 | try {
|
---|
| 179 | BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_file_json_bz2);
|
---|
| 180 | bw.write(solr_add_doc_json.toString());
|
---|
| 181 | bw.close();
|
---|
| 182 | } catch (IOException e) {
|
---|
| 183 | e.printStackTrace();
|
---|
| 184 | } catch (CompressorException e) {
|
---|
| 185 | e.printStackTrace();
|
---|
| 186 | }
|
---|
| 187 | }
|
---|
| 188 |
|
---|
[30970] | 189 | protected void postSolrDoc(JSONObject solr_add_doc_json)
|
---|
| 190 | {
|
---|
| 191 | // "http://10.11.0.53:8983/solr/"
|
---|
| 192 | String post_url = "http://10.11.0.53:8983/solr/htrc-pd-ef/update";
|
---|
| 193 |
|
---|
| 194 | //String curl_popen = "curl -X POST -H 'Content-Type: application/json'";
|
---|
| 195 | //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'";
|
---|
| 196 | //curl_popen += " --data-binary '";
|
---|
| 197 | //curl_popen += "'"
|
---|
| 198 |
|
---|
| 199 |
|
---|
| 200 | try {
|
---|
| 201 | HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection()));
|
---|
| 202 | httpcon.setDoOutput(true);
|
---|
| 203 | httpcon.setRequestProperty("Content-Type", "application/json");
|
---|
| 204 | httpcon.setRequestProperty("Accept", "application/json");
|
---|
| 205 | httpcon.setRequestMethod("POST");
|
---|
| 206 | httpcon.connect();
|
---|
| 207 |
|
---|
| 208 | byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8");
|
---|
| 209 | OutputStream os = httpcon.getOutputStream();
|
---|
| 210 | os.write(outputBytes);
|
---|
| 211 | os.close();
|
---|
| 212 | }
|
---|
| 213 | catch (Exception e) {
|
---|
| 214 | e.printStackTrace();
|
---|
| 215 | }
|
---|
| 216 |
|
---|
| 217 | }
|
---|
[30937] | 218 | public Iterator<String> call(String json_file_in)
|
---|
[30898] | 219 | {
|
---|
[30937] | 220 | JSONObject extracted_feature_record = readJSONFile(json_file_in);
|
---|
[30898] | 221 |
|
---|
[30937] | 222 | // Check output directory for volume exists, and create it if not
|
---|
| 223 |
|
---|
| 224 |
|
---|
[30970] | 225 | String volume_id = extracted_feature_record.getString("id");
|
---|
[30898] | 226 |
|
---|
[30945] | 227 | //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
|
---|
[30970] | 228 | //String title= ef_metadata.getString("title");
|
---|
| 229 |
|
---|
[30898] | 230 | JSONObject ef_features = extracted_feature_record.getJSONObject("features");
|
---|
| 231 |
|
---|
| 232 |
|
---|
| 233 | int ef_page_count = ef_features.getInt("pageCount");
|
---|
| 234 |
|
---|
[30942] | 235 | if (_verbosity >= 1) {
|
---|
| 236 | System.out.println("Processing: " + json_file_in);
|
---|
| 237 | System.out.println(" pageCount = " + ef_page_count);
|
---|
| 238 | }
|
---|
| 239 |
|
---|
[30898] | 240 | JSONArray ef_pages = ef_features.getJSONArray("pages");
|
---|
| 241 | int ef_num_pages = ef_pages.length();
|
---|
| 242 |
|
---|
[30945] | 243 | // Make directory for page-level JSON output
|
---|
| 244 | String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2");
|
---|
| 245 | String page_json_dir = json_dir + "/pages";
|
---|
[30951] | 246 | ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
|
---|
[30945] | 247 |
|
---|
[30898] | 248 | ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
|
---|
| 249 | for (int i = 0; i < ef_page_count; i++) {
|
---|
[30945] | 250 | String formatted_i = String.format("page-%06d", i);
|
---|
[30970] | 251 | String page_id = volume_id + "." + formatted_i;
|
---|
[30945] | 252 |
|
---|
| 253 | if (_verbosity >= 2) {
|
---|
| 254 | System.out.println(" Page: " + page_id);
|
---|
| 255 | }
|
---|
| 256 |
|
---|
[30949] | 257 | String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2";
|
---|
[30946] | 258 | ids.add(output_json_bz2);
|
---|
| 259 |
|
---|
[30945] | 260 | if (i==0) {
|
---|
[30946] | 261 | System.out.println("Sample output JSON page file: " + output_json_bz2);
|
---|
[30945] | 262 | }
|
---|
[30951] | 263 |
|
---|
| 264 | JSONObject ef_page = ef_pages.getJSONObject(i);
|
---|
[30970] | 265 |
|
---|
| 266 | if (ef_page != null) {
|
---|
| 267 | // Convert to Solr add form
|
---|
| 268 | JSONObject solr_add_doc_json = generateSolrDocJSON(volume_id, page_id, ef_page);
|
---|
| 269 |
|
---|
| 270 | if (i==20) {
|
---|
| 271 | System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
|
---|
| 272 | System.out.println("==================");
|
---|
[30971] | 273 | //System.out.println("Sample text [page 20]: " + solr_add_doc_json.getString("_text_"));
|
---|
[30970] | 274 | }
|
---|
| 275 |
|
---|
| 276 | // create JSON obj of just the page (for now), and write it out
|
---|
| 277 | // write out the JSONOBject as a bz2 compressed file
|
---|
| 278 | /*
|
---|
| 279 | try {
|
---|
| 280 | BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_json_bz2);
|
---|
| 281 | bw.write(ef_page.toString());
|
---|
| 282 | bw.close();
|
---|
| 283 | } catch (IOException e) {
|
---|
| 284 | e.printStackTrace();
|
---|
| 285 | } catch (CompressorException e) {
|
---|
| 286 | e.printStackTrace();
|
---|
| 287 | }
|
---|
| 288 | */
|
---|
[30973] | 289 |
|
---|
| 290 | saveSolrDoc(solr_add_doc_json,output_json_bz2);
|
---|
| 291 | //postSolrDoc(solr_add_doc_json);
|
---|
[30970] | 292 |
|
---|
[30951] | 293 | }
|
---|
[30970] | 294 | else {
|
---|
| 295 | System.err.println("Skipping: " + page_id);
|
---|
| 296 | }
|
---|
[30951] | 297 |
|
---|
[30898] | 298 | }
|
---|
| 299 |
|
---|
| 300 | /*
|
---|
| 301 | for (int i = 0; i < ef_num_pages; i++)
|
---|
| 302 | {
|
---|
| 303 | //String post_id = ef_pages.getJSONObject(i).getString("post_id");
|
---|
| 304 | //......
|
---|
| 305 | }
|
---|
| 306 | */
|
---|
| 307 | //String pageName = json_obj.getJSONObject("pageInfo").getString("pageName");
|
---|
| 308 | /*
|
---|
| 309 | JSONArray arr = obj.getJSONArray("posts");
|
---|
| 310 | for (int i = 0; i < arr.length(); i++)
|
---|
| 311 | {
|
---|
| 312 | String post_id = arr.getJSONObject(i).getString("post_id");
|
---|
| 313 | ......
|
---|
| 314 | }
|
---|
| 315 | */
|
---|
| 316 |
|
---|
| 317 |
|
---|
[30970] | 318 | ids.add(volume_id);
|
---|
[30898] | 319 |
|
---|
| 320 | return ids.iterator();
|
---|
| 321 | }
|
---|
| 322 | }
|
---|
| 323 |
|
---|