source: other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeMongoDBDocumentsMap.java@ 31313

Last change on this file since 31313 was 31313, checked in by davidb, 7 years ago

Alternative to avoid concurrency update exception

  • Property svn:executable set to *
File size: 7.7 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.IOException;
4import java.io.Reader;
5import java.io.StringReader;
6import java.util.ArrayList;
7import java.util.Iterator;
8import java.util.List;
9import java.util.Set;
10
11import org.apache.lucene.analysis.TokenStream;
12import org.apache.lucene.analysis.core.LowerCaseFilter;
13import org.apache.lucene.analysis.icu.segmentation.ICUTokenizer;
14import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
15import org.apache.spark.api.java.function.Function;
16import org.apache.spark.util.DoubleAccumulator;
17import org.bson.Document;
18import org.json.JSONArray;
19import org.json.JSONObject;
20
21import com.mongodb.MongoClient;
22import com.mongodb.MongoClientURI;
23import com.mongodb.MongoException;
24import com.mongodb.client.MongoCollection;
25import com.mongodb.client.MongoDatabase;
26
27
28class PerVolumeMongoDBDocumentsMap implements Function<String, Integer>
29{
30 private static final long serialVersionUID = 1L;
31
32 protected String _input_dir;
33 protected int _verbosity;
34
35 protected DoubleAccumulator _progress_accum;
36 protected double _progress_step;
37
38 boolean _strict_file_io;
39
40 public PerVolumeMongoDBDocumentsMap(String input_dir, int verbosity,
41 DoubleAccumulator progress_accum, double progress_step,
42 boolean strict_file_io)
43 {
44 _input_dir = input_dir;
45 _verbosity = verbosity;
46
47 _progress_accum = progress_accum;
48 _progress_step = progress_step;
49
50 _strict_file_io = strict_file_io;
51 }
52
53 protected void fixup_section(Document ef_count)
54 {
55
56
57 //Set<String> key_set = ef_count.keySet();
58 //for (String key : key_set) {
59
60 Iterator<String> key_iterator = ef_count.keySet().iterator();
61 while (key_iterator.hasNext()) {
62 String key = key_iterator.next();
63 if (key.matches("\\.")) {
64 String new_key = key.replaceAll("\\.", "<PERIOD>");
65 ef_count.put(new_key, ef_count.get(key));
66 ef_count.remove(key);
67 key = new_key;
68 }
69
70 if (key.matches("\\$")) {
71 String new_key = key.replaceAll("\\$", "<DOLLAR>");
72 ef_count.put(new_key, ef_count.get(key));
73 ef_count.remove(key);
74 }
75
76 }
77 }
78
79 protected void fixup_page(String volume_id, String page_id, Document ef_page)
80 {
81 if (ef_page != null) {
82 String[] zone_keys = { "header", "body", "footer" };
83
84 for (String zone_key: zone_keys) {
85 Document ef_zone = (Document)ef_page.get(zone_key);
86 if (ef_zone != null) {
87 String[] count_keys = { "beginCharCounts", "endCharCount", "tokenPosCount" };
88
89 for (String count_key: count_keys) {
90 Document ef_sub_section = (Document)ef_zone.get(count_key);
91 if (ef_sub_section != null) {
92 fixup_section(ef_sub_section);
93
94 if (count_key.equals("tokenPostCount")) {
95 Set<String> key_set = ef_sub_section.keySet();
96 for (String key : key_set) {
97 Document token_section = (Document)ef_sub_section.get(key);
98 fixup_section(token_section);
99 }
100 }
101
102
103 }
104 }
105 }
106 }
107 }
108 else {
109 System.err.println("Warning: null page for '" + page_id + "'");
110 }
111
112 }
113 protected void fixup_volume(String json_file_in, Document extracted_feature_record)
114 {
115 String full_json_file_in = _input_dir + "/" + json_file_in;
116
117 if (extracted_feature_record != null) {
118 String volume_id = extracted_feature_record.getString("id");
119 extracted_feature_record.put("_id",volume_id);
120 extracted_feature_record.remove("id");
121
122 Document ef_features = (Document)extracted_feature_record.get("features");
123
124 int ef_page_count = ef_features.getInteger("pageCount");
125
126 if (_verbosity >= 1) {
127 System.out.println("Processing: " + json_file_in);
128 System.out.println(" pageCount = " + ef_page_count);
129 }
130
131 List<Document> ef_pages = (List<Document>)ef_features.get("pages");
132 int ef_num_pages = ef_pages.size();
133 if (ef_num_pages != ef_page_count) {
134 System.err.println("Warning: number of page elements in JSON (" + ef_num_pages + ")"
135 +" does not match 'pageCount' metadata (" + ef_page_count + ")");
136 }
137
138 if (_verbosity >= 2) {
139 System.out.print(" Pages: ");
140 }
141
142 for (int i = 0; i < ef_page_count; i++) {
143 String formatted_i = String.format("page-%06d", i);
144 String page_id = volume_id + "." + formatted_i;
145
146 if (_verbosity >= 2) {
147 if (i>0) {
148 System.out.print(", ");
149 }
150 System.out.print(page_id);
151 }
152
153 if (i==(ef_page_count-1)) {
154 if (_verbosity >= 2) {
155 System.out.println();
156 }
157 }
158
159 Document ef_page = (Document)ef_pages.get(i);
160
161 if (ef_page != null) {
162
163 fixup_page(volume_id, page_id, ef_page);
164 }
165 else {
166 System.err.println("Skipping: " + page_id);
167 }
168 }
169 }
170 else {
171 // File did not exist, or could not be parsed
172 String mess = "Failed to read in bzipped JSON file '" + full_json_file_in + "'";
173
174 System.err.println("Warning: " + mess);
175 System.out.println("Warning: " + mess);
176
177 }
178 }
179
180 public Integer call(String json_file_in) throws IOException
181 {
182 try {
183 MongoClientURI mongo_url = new MongoClientURI("mongodb://gc3:27017,gc4:27017,gc5:27017");
184 MongoClient mongoClient = new MongoClient(mongo_url);
185
186 MongoDatabase database = mongoClient.getDatabase("htrc_ef");
187 MongoCollection<Document> collection = database.getCollection("volumes");
188
189 String full_json_file_in = _input_dir + "/" + json_file_in;
190 System.out.println("Processing: " + full_json_file_in);
191 String extracted_feature_json_doc = ClusterFileIO.readTextFile(full_json_file_in);
192
193 Document doc = Document.parse(extracted_feature_json_doc);
194
195 fixup_volume(json_file_in,doc);
196
197 collection.insertOne(doc);
198
199 /*
200 //Mongo mongo = new Mongo("localhost", 27017);
201 MongoClient mongo = new MongoClient( "localhost" , 27017 );
202
203 DB db = mongo.getDB("yourdb");
204 DBCollection coll = db.getCollection("dummyColl");
205
206 // convert JSON to DBObject directly
207 DBObject dbObject = (DBObject) JSON
208 .parse("{'name':'mkyong', 'age':30}");
209 coll.insert(dbObject);
210
211
212 DBCursor cursorDoc = coll.find();
213 while (cursorDoc.hasNext()) {
214 System.out.println(cursorDoc.next());
215 }
216
217 System.out.println("Done");
218*/
219 mongoClient.close();
220
221 } catch (MongoException e) {
222 e.printStackTrace();
223 }
224
225 return 1;
226 }
227 public Integer callPageCount(String json_file_in) throws IOException
228 {
229 Integer page_count = 0;
230
231 String full_json_file_in = _input_dir + "/" + json_file_in;
232 JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(full_json_file_in);
233
234 if (extracted_feature_record != null) {
235 String volume_id = extracted_feature_record.getString("id");
236
237 JSONObject ef_features = extracted_feature_record.getJSONObject("features");
238
239 if (_verbosity >= 1) {
240 System.out.println("Processing: " + json_file_in);
241 }
242
243 if (ef_features != null) {
244 String page_count_str = ef_features.getString("pageCount");
245 if (!page_count_str.equals("")) {
246 page_count = Integer.parseInt(page_count_str);
247 }
248 else {
249 System.err.println("No 'pageCount' in 'features' in volume id '" + volume_id + "' => defaulting to 0");
250 }
251 }
252 else {
253 System.err.println("No 'features' section in JSON file => Skipping id: " + volume_id);
254 }
255
256 }
257 else {
258 // File did not exist, or could not be parsed
259 String mess = "Failed to read in bzipped JSON file '" + full_json_file_in + "'";
260 if (_strict_file_io) {
261 throw new IOException(mess);
262 }
263 else {
264 System.err.println("Warning: " + mess);
265 System.out.println("Warning: " + mess);
266 }
267 }
268
269 _progress_accum.add(_progress_step);
270
271 return page_count;
272 }
273
274
275}
276
Note: See TracBrowser for help on using the repository browser.