source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java@ 30984

Last change on this file since 30984 was 30984, checked in by davidb, 7 years ago

Introduction of Spark accumulator to measure progress. Output of POST read in and status checked for

  • Property svn:executable set to *
File size: 10.5 KB
Line 
1package org.hathitrust;
2
3import java.io.BufferedReader;
4import java.io.BufferedWriter;
5import java.io.IOException;
6import java.io.InputStreamReader;
7import java.io.OutputStream;
8import java.net.HttpURLConnection;
9import java.net.URL;
10import java.util.ArrayList;
11import java.util.Iterator;
12import java.util.Set;
13
14import org.apache.commons.compress.compressors.CompressorException;
15import org.apache.spark.api.java.function.FlatMapFunction;
16import org.apache.spark.util.DoubleAccumulator;
17import org.json.JSONArray;
18import org.json.JSONObject;
19
20/*
21class PagedJSON implements Function<String, Boolean> {
22
23 private static final long serialVersionUID = 1L;
24
25 public Boolean call(String s) { return s.contains("a"); }
26}
27 */
28
29
30class PagedJSON implements FlatMapFunction<String, String>
31{
32 private static final long serialVersionUID = 1L;
33
34 protected String _input_dir;
35 protected String _solr_url;
36 protected String _output_dir;
37 protected int _verbosity;
38
39 DoubleAccumulator _progress_accum;
40 double _progress_step;
41
42 public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity,
43 DoubleAccumulator progress_accum, double progress_step)
44 {
45 _input_dir = input_dir;
46 _solr_url = solr_url;
47 _output_dir = output_dir;
48 _verbosity = verbosity;
49
50 _progress_accum = progress_accum;
51 _progress_step = progress_step;
52 }
53
54 protected JSONObject readJSONFile(String filename)
55 {
56 StringBuilder sb = new StringBuilder();
57
58 try {
59
60 String str;
61 BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
62 while ((str = br.readLine()) != null) {
63 sb.append(str);
64 }
65
66 br.close();
67 }
68 catch (Exception e) {
69 e.printStackTrace();
70 }
71
72 JSONObject json_obj = new JSONObject(sb.toString());
73
74
75 return json_obj;
76 }
77
78 protected String generateSolrText(JSONObject ef_token_pos_count)
79 {
80 StringBuilder sb = new StringBuilder();
81
82 Iterator<String> token_iter = ef_token_pos_count.keys();
83 while (token_iter.hasNext()) {
84 String token = token_iter.next();
85
86 sb.append(token);
87 if (token_iter.hasNext()) {
88 sb.append(" ");
89 }
90 }
91
92 /*
93 Set<String> token_keys = ef_token_pos_count.keySet();
94 for (String token : token_keys) {
95 sb.append(token + " ");
96 }
97*/
98
99 return sb.toString();
100 }
101
102 protected JSONObject generateSolrDocJSON(String volume_id, String page_id, JSONObject ef_page)
103 {
104 JSONObject solr_update_json = null;
105
106 if (ef_page != null) {
107 JSONObject ef_body = ef_page.getJSONObject("body");
108 if (ef_body != null) {
109 JSONObject ef_token_pos_count = ef_body.getJSONObject("tokenPosCount");
110 if (ef_token_pos_count != null) {
111
112 JSONObject solr_add_json = new JSONObject();
113
114 String text = generateSolrText(ef_token_pos_count);
115
116 JSONObject solr_doc_json = new JSONObject();
117 solr_doc_json.put("id", page_id);
118 solr_doc_json.put("volumeid_s", volume_id);
119 solr_doc_json.put("_text_", text);
120
121 solr_add_json.put("commitWithin", 5000);
122 solr_add_json.put("doc", solr_doc_json);
123
124 solr_update_json = new JSONObject();
125 solr_update_json.put("add",solr_add_json);
126
127 }
128 else {
129 System.err.println("Warning: empty tokenPosCount field for '" + page_id + "'");
130 }
131 }
132 else {
133 System.err.println("Warning: empty body field for '" + page_id + "'");
134 }
135
136 }
137 else {
138 System.err.println("Warning: null page for '" + page_id + "'");
139 }
140
141
142 /*
143
144 /update/json/docs
145 */
146
147 // For Reference ...
148 // Example documentation on Solr JSON syntax:
149 // https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers
150 // #UploadingDatawithIndexHandlers-JSONFormattedIndexUpdates
151
152 /*
153 curl -X POST -H 'Content-Type: application/json' 'http://localhost:8983/solr/my_collection/update' --data-binary '
154 {
155 "add": {
156 "doc": {
157 "id": "DOC1",
158 "my_boosted_field": { use a map with boost/value for a boosted field
159 "boost": 2.3,
160 "value": "test"
161 },
162 "my_multivalued_field": [ "aaa", "bbb" ] Can use an array for a multi-valued field
163 }
164 },
165 "add": {
166 "commitWithin": 5000, commit this document within 5 seconds
167 "overwrite": false, don't check for existing documents with the same uniqueKey
168 "boost": 3.45, a document boost
169 "doc": {
170 "f1": "v1", Can use repeated keys for a multi-valued field
171 "f1": "v2"
172 }
173 },
174
175 "commit": {},
176 "optimize": { "waitSearcher":false },
177
178 "delete": { "id":"ID" }, delete by ID
179 "delete": { "query":"QUERY" } delete by query
180 }'
181 */
182
183 return solr_update_json;
184 }
185
186 protected void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2)
187 {
188 try {
189 BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_file_json_bz2);
190 bw.write(solr_add_doc_json.toString());
191 bw.close();
192 } catch (IOException e) {
193 e.printStackTrace();
194 } catch (CompressorException e) {
195 e.printStackTrace();
196 }
197 }
198
199 protected void postSolrDoc(JSONObject solr_add_doc_json)
200 {
201 String post_url = _solr_url;
202
203 //String curl_popen = "curl -X POST -H 'Content-Type: application/json'";
204 //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'";
205 //curl_popen += " --data-binary '";
206 //curl_popen += "'"
207
208
209 try {
210 HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection()));
211 httpcon.setDoOutput(true);
212 httpcon.setRequestProperty("Content-Type", "application/json");
213 httpcon.setRequestProperty("Accept", "application/json");
214 httpcon.setRequestMethod("POST");
215 httpcon.connect();
216
217 byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8");
218 OutputStream os = httpcon.getOutputStream();
219 os.write(outputBytes);
220 os.close();
221
222
223 // Read response
224 StringBuilder sb = new StringBuilder();
225 BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream()));
226 String decodedString;
227 while ((decodedString = in.readLine()) != null) {
228 //System.out.println(decodedString);
229 sb.append(decodedString);
230 }
231 in.close();
232
233 JSONObject solr_status_json = new JSONObject(sb.toString());
234 JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader");
235 if (response_header_json != null) {
236 int status = response_header_json.getInt("status");
237 if (status != 0) {
238 System.err.println("Warning: POST request to " + post_url + " returned status " + status);
239 System.err.println("Full response was: " + sb);
240 }
241 }
242 else {
243 System.err.println("Failed response to Solr POST: " + sb);
244 }
245
246
247
248 }
249 catch (Exception e) {
250 e.printStackTrace();
251 }
252
253 }
254 public Iterator<String> call(String json_file_in)
255 {
256 JSONObject extracted_feature_record = readJSONFile(json_file_in);
257
258 String volume_id = extracted_feature_record.getString("id");
259
260 //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
261 //String title= ef_metadata.getString("title");
262
263 JSONObject ef_features = extracted_feature_record.getJSONObject("features");
264
265
266 int ef_page_count = ef_features.getInt("pageCount");
267
268 if (_verbosity >= 1) {
269 System.out.println("Processing: " + json_file_in);
270 System.out.println(" pageCount = " + ef_page_count);
271 }
272
273 JSONArray ef_pages = ef_features.getJSONArray("pages");
274 int ef_num_pages = ef_pages.length();
275
276 // Make directory for page-level JSON output
277 String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2");
278 String page_json_dir = json_dir + "/pages";
279 ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
280
281 ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
282 for (int i = 0; i < ef_page_count; i++) {
283 String formatted_i = String.format("page-%06d", i);
284 String page_id = volume_id + "." + formatted_i;
285
286 if (_verbosity >= 2) {
287 System.out.println(" Page: " + page_id);
288 }
289
290 String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2";
291 ids.add(output_json_bz2);
292
293 if (i==0) {
294 System.out.println("Sample output JSON page file: " + output_json_bz2);
295 }
296
297 JSONObject ef_page = ef_pages.getJSONObject(i);
298
299 if (ef_page != null) {
300 // Convert to Solr add form
301 JSONObject solr_add_doc_json = generateSolrDocJSON(volume_id, page_id, ef_page);
302
303 if (i==20) {
304 System.out.println("==================");
305 System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
306 System.out.println("==================");
307 //System.out.println("Sample text [page 20]: " + solr_add_doc_json.getString("_text_"));
308 }
309
310 // create JSON obj of just the page (for now), and write it out
311 // write out the JSONOBject as a bz2 compressed file
312 /*
313 try {
314 BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_json_bz2);
315 bw.write(ef_page.toString());
316 bw.close();
317 } catch (IOException e) {
318 e.printStackTrace();
319 } catch (CompressorException e) {
320 e.printStackTrace();
321 }
322 */
323
324 if (_solr_url != null) {
325 if (i==20) {
326 System.out.println("==================");
327 System.out.println("Posting to: " + _solr_url);
328 System.out.println("==================");
329 }
330 postSolrDoc(solr_add_doc_json);
331 }
332
333 if (_output_dir != null) {
334 if (i==20) {
335 System.out.println("==================");
336 System.out.println("Saving to: " + _output_dir);
337 System.out.println("==================");
338 }
339 saveSolrDoc(solr_add_doc_json,output_json_bz2);
340 }
341 }
342 else {
343 System.err.println("Skipping: " + page_id);
344 }
345
346 }
347
348 /*
349 for (int i = 0; i < ef_num_pages; i++)
350 {
351 //String post_id = ef_pages.getJSONObject(i).getString("post_id");
352 //......
353 }
354 */
355 //String pageName = json_obj.getJSONObject("pageInfo").getString("pageName");
356/*
357 JSONArray arr = obj.getJSONArray("posts");
358 for (int i = 0; i < arr.length(); i++)
359 {
360 String post_id = arr.getJSONObject(i).getString("post_id");
361 ......
362 }
363*/
364
365
366 ids.add(volume_id);
367 _progress_accum.add(_progress_step);
368 return ids.iterator();
369 }
370}
371
Note: See TracBrowser for help on using the repository browser.