source: other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeMongoDBDocumentsMap.java@ 31319

Last change on this file since 31319 was 31319, checked in by davidb, 7 years ago

Changed to replace existing MongoDB entry. Fixed up printt statement

  • Property svn:executable set to *
File size: 7.5 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.IOException;
4import java.util.List;
5import java.util.Set;
6
7import org.apache.spark.api.java.function.Function;
8import org.apache.spark.util.DoubleAccumulator;
9import org.bson.Document;
10import org.json.JSONObject;
11
12import com.mongodb.MongoClient;
13import com.mongodb.MongoClientURI;
14import com.mongodb.MongoException;
15import com.mongodb.client.MongoCollection;
16import com.mongodb.client.MongoDatabase;
17
18
19class PerVolumeMongoDBDocumentsMap implements Function<String, Integer>
20{
21 private static final long serialVersionUID = 1L;
22
23 protected String _input_dir;
24 protected int _verbosity;
25
26 protected DoubleAccumulator _progress_accum;
27 protected double _progress_step;
28
29 boolean _strict_file_io;
30
31 public PerVolumeMongoDBDocumentsMap(String input_dir, int verbosity,
32 DoubleAccumulator progress_accum, double progress_step,
33 boolean strict_file_io)
34 {
35 _input_dir = input_dir;
36 _verbosity = verbosity;
37
38 _progress_accum = progress_accum;
39 _progress_step = progress_step;
40
41 _strict_file_io = strict_file_io;
42 }
43
44 protected void fixup_section(Document ef_count)
45 {
46
47 Set<String> key_set = ef_count.keySet();
48 String[] key_array = key_set.toArray(new String[key_set.size()]);
49
50 for (int i=0; i<key_array.length; i++) {
51
52 String key = key_array[i];
53 //String key = key_iterator.next();
54 if (key.contains(".")) {
55 String new_key = key.replaceAll("\\.", "<PERIOD>");
56 //System.out.println("**** old key:" + key + "=> new key:" + new_key);
57 ef_count.put(new_key, ef_count.get(key));
58 ef_count.remove(key);
59 key = new_key;
60 }
61
62 if (key.contains("$")) {
63 String new_key = key.replaceAll("\\$", "<DOLLAR>");
64 ef_count.put(new_key, ef_count.get(key));
65 ef_count.remove(key);
66 }
67
68 }
69 }
70
71 protected void fixup_page(String volume_id, String page_id, Document ef_page)
72 {
73 if (ef_page != null) {
74 String[] zone_keys = { "header", "body", "footer" };
75
76 for (String zone_key: zone_keys) {
77 Document ef_zone = (Document)ef_page.get(zone_key);
78 if (ef_zone != null) {
79 String[] count_keys = { "beginCharCounts", "endCharCount", "tokenPosCount" };
80
81 for (String count_key: count_keys) {
82 Document ef_sub_section = (Document)ef_zone.get(count_key);
83 if (ef_sub_section != null) {
84 fixup_section(ef_sub_section);
85
86 if (count_key.equals("tokenPosCount")) {
87 Set<String> key_set = ef_sub_section.keySet();
88 for (String key : key_set) {
89 Document token_section = (Document)ef_sub_section.get(key);
90 fixup_section(token_section);
91 }
92 }
93
94
95 }
96 }
97 }
98 }
99 }
100 else {
101 System.err.println("Warning: null page for '" + page_id + "'");
102 }
103
104 }
105 protected void fixup_volume(String json_file_in, Document extracted_feature_record)
106 {
107 String full_json_file_in = _input_dir + "/" + json_file_in;
108
109 if (extracted_feature_record != null) {
110 String volume_id = extracted_feature_record.getString("id");
111 extracted_feature_record.put("_id",volume_id);
112 extracted_feature_record.remove("id");
113
114 Document ef_features = (Document)extracted_feature_record.get("features");
115
116 int ef_page_count = ef_features.getInteger("pageCount");
117
118 if (_verbosity >= 1) {
119 System.out.println("Processing: " + json_file_in);
120 System.out.println(" pageCount = " + ef_page_count);
121 }
122
123 List<Document> ef_pages = (List<Document>)ef_features.get("pages");
124 int ef_num_pages = ef_pages.size();
125 if (ef_num_pages != ef_page_count) {
126 System.err.println("Warning: number of page elements in JSON (" + ef_num_pages + ")"
127 +" does not match 'pageCount' metadata (" + ef_page_count + ")");
128 }
129
130 if (_verbosity >= 2) {
131 System.out.print(" Pages: ");
132 }
133
134 for (int i = 0; i < ef_page_count; i++) {
135 String formatted_i = String.format("page-%06d", i);
136 String page_id = volume_id + "." + formatted_i;
137
138 if (_verbosity >= 2) {
139 if (i>0) {
140 System.out.print(", ");
141 }
142 System.out.print(page_id);
143 }
144
145 if (i==(ef_page_count-1)) {
146 if (_verbosity >= 2) {
147 System.out.println();
148 }
149 }
150
151 Document ef_page = (Document)ef_pages.get(i);
152
153 if (ef_page != null) {
154
155 fixup_page(volume_id, page_id, ef_page);
156 }
157 else {
158 System.err.println("Skipping: " + page_id);
159 }
160 }
161 }
162 else {
163 // File did not exist, or could not be parsed
164 String mess = "Failed to read in bzipped JSON file '" + full_json_file_in + "'";
165
166 System.err.println("Warning: " + mess);
167 System.out.println("Warning: " + mess);
168
169 }
170 }
171
172 public Integer call(String json_file_in) throws IOException
173 {
174 try {
175 MongoClientURI mongo_url = new MongoClientURI("mongodb://gc3:27017,gc4:27017,gc5:27017");
176 MongoClient mongoClient = new MongoClient(mongo_url);
177
178 MongoDatabase database = mongoClient.getDatabase("htrc_ef");
179 MongoCollection<Document> collection = database.getCollection("volumes");
180
181 String full_json_file_in = _input_dir + "/" + json_file_in;
182 System.out.println("Processing: " + full_json_file_in);
183 String extracted_feature_json_doc = ClusterFileIO.readTextFile(full_json_file_in);
184
185 Document doc = Document.parse(extracted_feature_json_doc);
186
187 fixup_volume(json_file_in,doc);
188
189 collection.findOneAndReplace(Document.parse("{_id:" + doc.getString("_id") + "}"),doc);
190
191 //collection.insertOne(doc);
192
193 /*
194 //Mongo mongo = new Mongo("localhost", 27017);
195 MongoClient mongo = new MongoClient( "localhost" , 27017 );
196
197 DB db = mongo.getDB("yourdb");
198 DBCollection coll = db.getCollection("dummyColl");
199
200 // convert JSON to DBObject directly
201 DBObject dbObject = (DBObject) JSON
202 .parse("{'name':'mkyong', 'age':30}");
203 coll.insert(dbObject);
204
205
206 DBCursor cursorDoc = coll.find();
207 while (cursorDoc.hasNext()) {
208 System.out.println(cursorDoc.next());
209 }
210
211 System.out.println("Done");
212*/
213 mongoClient.close();
214
215 } catch (MongoException e) {
216 e.printStackTrace();
217 }
218
219 return 1;
220 }
221 public Integer callPageCount(String json_file_in) throws IOException
222 {
223 Integer page_count = 0;
224
225 String full_json_file_in = _input_dir + "/" + json_file_in;
226 JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(full_json_file_in);
227
228 if (extracted_feature_record != null) {
229 String volume_id = extracted_feature_record.getString("id");
230
231 JSONObject ef_features = extracted_feature_record.getJSONObject("features");
232
233 if (_verbosity >= 1) {
234 System.out.println("Processing: " + json_file_in);
235 }
236
237 if (ef_features != null) {
238 String page_count_str = ef_features.getString("pageCount");
239 if (!page_count_str.equals("")) {
240 page_count = Integer.parseInt(page_count_str);
241 }
242 else {
243 System.err.println("No 'pageCount' in 'features' in volume id '" + volume_id + "' => defaulting to 0");
244 }
245 }
246 else {
247 System.err.println("No 'features' section in JSON file => Skipping id: " + volume_id);
248 }
249
250 }
251 else {
252 // File did not exist, or could not be parsed
253 String mess = "Failed to read in bzipped JSON file '" + full_json_file_in + "'";
254 if (_strict_file_io) {
255 throw new IOException(mess);
256 }
257 else {
258 System.err.println("Warning: " + mess);
259 System.out.println("Warning: " + mess);
260 }
261 }
262
263 _progress_accum.add(_progress_step);
264
265 return page_count;
266 }
267
268
269}
270
Note: See TracBrowser for help on using the repository browser.