package org.hathitrust; import java.io.BufferedReader; import java.util.ArrayList; import java.util.Iterator; import org.apache.spark.api.java.function.FlatMapFunction; import org.json.JSONArray; import org.json.JSONObject; /* class PagedJSON implements Function { private static final long serialVersionUID = 1L; public Boolean call(String s) { return s.contains("a"); } } */ class PagedJSON implements FlatMapFunction { private static final long serialVersionUID = 1L; protected String _input_dir; protected String _output_dir; protected int _verbosity; public PagedJSON(String input_dir, String output_dir, int verbosity) { _input_dir = input_dir; _output_dir = output_dir; _verbosity = verbosity; } protected JSONObject readJSONFile(String filename) { //Path path = Paths.get(filename); StringBuilder sb = new StringBuilder(); try { String str; BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename); while ((str = br.readLine()) != null) { sb.append(str); } br.close(); } catch (Exception e) { e.printStackTrace(); } JSONObject json_obj = new JSONObject(sb.toString()); return json_obj; } public Iterator call(String json_file_in) { JSONObject extracted_feature_record = readJSONFile(json_file_in); // Check output directory for volume exists, and create it if not String id = extracted_feature_record.getString("id"); //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); JSONObject ef_features = extracted_feature_record.getJSONObject("features"); int ef_page_count = ef_features.getInt("pageCount"); if (_verbosity >= 1) { System.out.println("Processing: " + json_file_in); System.out.println(" pageCount = " + ef_page_count); } JSONArray ef_pages = ef_features.getJSONArray("pages"); int ef_num_pages = ef_pages.length(); // Make directory for page-level JSON output String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2"); String page_json_dir = json_dir + "/pages"; //ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); System.out.println("mkdir: " + _output_dir + "/" + page_json_dir); ArrayList ids = new ArrayList(ef_num_pages); for (int i = 0; i < ef_page_count; i++) { String formatted_i = String.format("page-%06d", i); String page_id = id + "." + formatted_i; if (_verbosity >= 2) { System.out.println(" Page: " + page_id); } // create JSON obj of just the page (for now) // write it out ids.add(page_json_dir +"/" + page_id + ".json.bz2"); if (i==0) { System.out.println("Sample output JSON page file: " + page_json_dir +"/" + page_id + ".json.bz2"); } } /* for (int i = 0; i < ef_num_pages; i++) { //String post_id = ef_pages.getJSONObject(i).getString("post_id"); //...... } */ //String pageName = json_obj.getJSONObject("pageInfo").getString("pageName"); /* JSONArray arr = obj.getJSONArray("posts"); for (int i = 0; i < arr.length(); i++) { String post_id = arr.getJSONObject(i).getString("post_id"); ...... } */ ids.add(id); return ids.iterator(); } }