source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java@ 30985

Last change on this file since 30985 was 30985, checked in by davidb, 7 years ago

Changed to run main processing method as action rather than transform. Done to help accumulator add

  • Property svn:executable set to *
File size: 9.7 KB
Line 
1package org.hathitrust;
2
3import java.io.BufferedReader;
4import java.io.BufferedWriter;
5import java.io.IOException;
6import java.io.InputStreamReader;
7import java.io.OutputStream;
8import java.net.HttpURLConnection;
9import java.net.URL;
10import java.util.ArrayList;
11import java.util.Iterator;
12import java.util.Set;
13
14import org.apache.commons.compress.compressors.CompressorException;
15import org.apache.spark.api.java.function.FlatMapFunction;
16import org.apache.spark.api.java.function.VoidFunction;
17import org.apache.spark.util.DoubleAccumulator;
18import org.json.JSONArray;
19import org.json.JSONObject;
20
21/*
22class PagedJSON implements Function<String, Boolean> {
23
24 private static final long serialVersionUID = 1L;
25
26 public Boolean call(String s) { return s.contains("a"); }
27}
28 */
29
30
31//class PagedJSON implements FlatMapFunction<String, String>
32class PagedJSON implements VoidFunction<String>
33{
34 private static final long serialVersionUID = 1L;
35
36 protected String _input_dir;
37 protected String _solr_url;
38 protected String _output_dir;
39 protected int _verbosity;
40
41 DoubleAccumulator _progress_accum;
42 double _progress_step;
43
44 public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity,
45 DoubleAccumulator progress_accum, double progress_step)
46 {
47 _input_dir = input_dir;
48 _solr_url = solr_url;
49 _output_dir = output_dir;
50 _verbosity = verbosity;
51
52 _progress_accum = progress_accum;
53 _progress_step = progress_step;
54 }
55
56 protected JSONObject readJSONFile(String filename)
57 {
58 StringBuilder sb = new StringBuilder();
59
60 try {
61
62 String str;
63 BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
64 while ((str = br.readLine()) != null) {
65 sb.append(str);
66 }
67
68 br.close();
69 }
70 catch (Exception e) {
71 e.printStackTrace();
72 }
73
74 JSONObject json_obj = new JSONObject(sb.toString());
75
76
77 return json_obj;
78 }
79
80 protected String generateSolrText(JSONObject ef_token_pos_count)
81 {
82 StringBuilder sb = new StringBuilder();
83
84 Iterator<String> token_iter = ef_token_pos_count.keys();
85 while (token_iter.hasNext()) {
86 String token = token_iter.next();
87
88 sb.append(token);
89 if (token_iter.hasNext()) {
90 sb.append(" ");
91 }
92 }
93
94 /*
95 Set<String> token_keys = ef_token_pos_count.keySet();
96 for (String token : token_keys) {
97 sb.append(token + " ");
98 }
99*/
100
101 return sb.toString();
102 }
103
104 protected JSONObject generateSolrDocJSON(String volume_id, String page_id, JSONObject ef_page)
105 {
106 JSONObject solr_update_json = null;
107
108 if (ef_page != null) {
109 JSONObject ef_body = ef_page.getJSONObject("body");
110 if (ef_body != null) {
111 JSONObject ef_token_pos_count = ef_body.getJSONObject("tokenPosCount");
112 if (ef_token_pos_count != null) {
113
114 JSONObject solr_add_json = new JSONObject();
115
116 String text = generateSolrText(ef_token_pos_count);
117
118 JSONObject solr_doc_json = new JSONObject();
119 solr_doc_json.put("id", page_id);
120 solr_doc_json.put("volumeid_s", volume_id);
121 solr_doc_json.put("_text_", text);
122
123 solr_add_json.put("commitWithin", 5000);
124 solr_add_json.put("doc", solr_doc_json);
125
126 solr_update_json = new JSONObject();
127 solr_update_json.put("add",solr_add_json);
128
129 }
130 else {
131 System.err.println("Warning: empty tokenPosCount field for '" + page_id + "'");
132 }
133 }
134 else {
135 System.err.println("Warning: empty body field for '" + page_id + "'");
136 }
137
138 }
139 else {
140 System.err.println("Warning: null page for '" + page_id + "'");
141 }
142
143
144 /*
145
146 /update/json/docs
147 */
148
149 // For Reference ...
150 // Example documentation on Solr JSON syntax:
151 // https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers
152 // #UploadingDatawithIndexHandlers-JSONFormattedIndexUpdates
153
154 /*
155 curl -X POST -H 'Content-Type: application/json' 'http://localhost:8983/solr/my_collection/update' --data-binary '
156 {
157 "add": {
158 "doc": {
159 "id": "DOC1",
160 "my_boosted_field": { use a map with boost/value for a boosted field
161 "boost": 2.3,
162 "value": "test"
163 },
164 "my_multivalued_field": [ "aaa", "bbb" ] Can use an array for a multi-valued field
165 }
166 },
167 "add": {
168 "commitWithin": 5000, commit this document within 5 seconds
169 "overwrite": false, don't check for existing documents with the same uniqueKey
170 "boost": 3.45, a document boost
171 "doc": {
172 "f1": "v1", Can use repeated keys for a multi-valued field
173 "f1": "v2"
174 }
175 },
176
177 "commit": {},
178 "optimize": { "waitSearcher":false },
179
180 "delete": { "id":"ID" }, delete by ID
181 "delete": { "query":"QUERY" } delete by query
182 }'
183 */
184
185 return solr_update_json;
186 }
187
188 protected void saveSolrDoc(JSONObject solr_add_doc_json, String output_file_json_bz2)
189 {
190 try {
191 BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_file_json_bz2);
192 bw.write(solr_add_doc_json.toString());
193 bw.close();
194 } catch (IOException e) {
195 e.printStackTrace();
196 } catch (CompressorException e) {
197 e.printStackTrace();
198 }
199 }
200
201 protected void postSolrDoc(JSONObject solr_add_doc_json)
202 {
203 String post_url = _solr_url;
204
205 //String curl_popen = "curl -X POST -H 'Content-Type: application/json'";
206 //curl_popen += " 'http://10.11.0.53:8983/solr/htrc-pd-ef/update'";
207 //curl_popen += " --data-binary '";
208 //curl_popen += "'"
209
210
211 try {
212 HttpURLConnection httpcon = (HttpURLConnection) ((new URL(post_url).openConnection()));
213 httpcon.setDoOutput(true);
214 httpcon.setRequestProperty("Content-Type", "application/json");
215 httpcon.setRequestProperty("Accept", "application/json");
216 httpcon.setRequestMethod("POST");
217 httpcon.connect();
218
219 byte[] outputBytes = solr_add_doc_json.toString().getBytes("UTF-8");
220 OutputStream os = httpcon.getOutputStream();
221 os.write(outputBytes);
222 os.close();
223
224
225 // Read response
226 StringBuilder sb = new StringBuilder();
227 BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream()));
228 String decodedString;
229 while ((decodedString = in.readLine()) != null) {
230 sb.append(decodedString);
231 }
232 in.close();
233
234 JSONObject solr_status_json = new JSONObject(sb.toString());
235 JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader");
236 if (response_header_json != null) {
237 int status = response_header_json.getInt("status");
238 if (status != 0) {
239 System.err.println("Warning: POST request to " + post_url + " returned status " + status);
240 System.err.println("Full response was: " + sb);
241 }
242 }
243 else {
244 System.err.println("Failed response to Solr POST: " + sb);
245 }
246
247
248
249 }
250 catch (Exception e) {
251 e.printStackTrace();
252 }
253
254 }
255
256 //public Iterator<String> call(String json_file_in)
257 public void call(String json_file_in)
258 {
259 JSONObject extracted_feature_record = readJSONFile(json_file_in);
260
261 String volume_id = extracted_feature_record.getString("id");
262
263 //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
264 //String title= ef_metadata.getString("title");
265
266 JSONObject ef_features = extracted_feature_record.getJSONObject("features");
267
268
269 int ef_page_count = ef_features.getInt("pageCount");
270
271 if (_verbosity >= 1) {
272 System.out.println("Processing: " + json_file_in);
273 System.out.println(" pageCount = " + ef_page_count);
274 }
275
276 JSONArray ef_pages = ef_features.getJSONArray("pages");
277 int ef_num_pages = ef_pages.length();
278
279 // Make directory for page-level JSON output
280 String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2");
281 String page_json_dir = json_dir + "/pages";
282 ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
283
284 ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
285 for (int i = 0; i < ef_page_count; i++) {
286 String formatted_i = String.format("page-%06d", i);
287 String page_id = volume_id + "." + formatted_i;
288
289 if (_verbosity >= 2) {
290 System.out.println(" Page: " + page_id);
291 }
292
293 String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2";
294 ids.add(output_json_bz2);
295
296 if (i==0) {
297 System.out.println("Sample output JSON page file: " + output_json_bz2);
298 }
299
300 JSONObject ef_page = ef_pages.getJSONObject(i);
301
302 if (ef_page != null) {
303 // Convert to Solr add form
304 JSONObject solr_add_doc_json = generateSolrDocJSON(volume_id, page_id, ef_page);
305
306 if (i==20) {
307 System.out.println("==================");
308 System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
309 System.out.println("==================");
310 }
311
312
313 if (_solr_url != null) {
314 if (i==20) {
315 System.out.println("==================");
316 System.out.println("Posting to: " + _solr_url);
317 System.out.println("==================");
318 }
319 postSolrDoc(solr_add_doc_json);
320 }
321
322 if (_output_dir != null) {
323 if (i==20) {
324 System.out.println("==================");
325 System.out.println("Saving to: " + _output_dir);
326 System.out.println("==================");
327 }
328 saveSolrDoc(solr_add_doc_json,output_json_bz2);
329 }
330 }
331 else {
332 System.err.println("Skipping: " + page_id);
333 }
334
335 }
336
337
338 ids.add(volume_id);
339 _progress_accum.add(_progress_step);
340
341 //return ids.iterator();
342 }
343}
344
Note: See TracBrowser for help on using the repository browser.