package org.hathitrust.extractedfeatures; import java.util.ArrayList; import java.util.Iterator; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.util.DoubleAccumulator; import org.json.JSONArray; import org.json.JSONObject; /* class PagedJSON implements Function { private static final long serialVersionUID = 1L; public Boolean call(String s) { return s.contains("a"); } } */ class PagedJSON extends BasePerJSON implements FlatMapFunction //public class PagedJSON implements VoidFunction { private static final long serialVersionUID = 1L; public PagedJSON() { super(); } public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity, DoubleAccumulator progress_accum, double progress_step) { super(input_dir,solr_url,output_dir,verbosity,progress_accum,progress_step); } public Iterator call(String json_file_in) //public void call(String json_file_in) { JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(_input_dir + "/" + json_file_in); String volume_id = extracted_feature_record.getString("id"); //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); //String title= ef_metadata.getString("title"); JSONObject ef_features = extracted_feature_record.getJSONObject("features"); int ef_page_count = ef_features.getInt("pageCount"); if (_verbosity >= 1) { System.out.println("Processing: " + json_file_in); System.out.println(" pageCount = " + ef_page_count); } JSONArray ef_pages = ef_features.getJSONArray("pages"); int ef_num_pages = ef_pages.length(); // Make directory for page-level JSON output String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2"); String page_json_dir = json_dir + "/pages"; ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); ArrayList json_pages = new ArrayList(ef_num_pages); for (int i = 0; i < ef_page_count; i++) { String formatted_i = String.format("page-%06d", i); String page_id = volume_id + "." + formatted_i; if (_verbosity >= 2) { System.out.println(" Page: " + page_id); } String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; //ids.add(output_json_bz2); if (i==0) { System.out.println("Sample output JSON page file: " + output_json_bz2); } JSONObject ef_page = ef_pages.getJSONObject(i); if (ef_page != null) { // Convert to Solr add form JSONObject solr_add_doc_json = JSONSolrTransform.generateSolrDocJSON(volume_id, page_id, ef_page); solr_add_doc_json.put("filename_json_bz2", output_json_bz2); json_pages.add(solr_add_doc_json); } else { System.err.println("Skipping: " + page_id); } } //ids.add(volume_id); _progress_accum.add(_progress_step); //return ids.iterator(); return json_pages.iterator(); } }