package org.hathitrust; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.net.URI; import java.nio.charset.StandardCharsets; //import java.nio.file.Files; //import java.nio.file.Path; //import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.commons.compress.compressors.CompressorException; import org.apache.commons.compress.compressors.CompressorInputStream; import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.function.FlatMapFunction; import org.json.JSONArray; import org.json.JSONObject; /* class PagedJSON implements Function { private static final long serialVersionUID = 1L; public Boolean call(String s) { return s.contains("a"); } } */ /* URI uri = URI.create ("hdfs://host:port/file path"); Configuration conf = new Configuration(); FileSystem file = FileSystem.get(uri, conf); FSDataInputStream in = file.open(new Path(uri)); */ class PagedJSON implements FlatMapFunction { private static final long serialVersionUID = 1L; protected String _input_dir; public PagedJSON(String input_dir) { _input_dir = input_dir; } protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn) throws CompressorException, IOException { URI uri = URI.create (fileIn); Configuration conf = new Configuration(); FileSystem file = FileSystem.get(uri, conf); FSDataInputStream fin = file.open(new Path(uri)); //FileInputStream fin = new FileInputStream(fileIn); BufferedInputStream bis = new BufferedInputStream(fin); CompressorInputStream input = new CompressorStreamFactory().createCompressorInputStream(bis); BufferedReader br2 = new BufferedReader(new InputStreamReader(input,"UTF8")); return br2; } protected JSONObject readJSONFile(String filename) { //Path path = Paths.get(filename); StringBuilder sb = new StringBuilder(); try { String str; BufferedReader br = getBufferedReaderForCompressedFile(_input_dir + "/" + filename); while ((str = br.readLine()) != null) { sb.append(str); //System.out.println(str); } br.close(); //System.err.println("*****" + sb.toString()); /* List lines = Files.readAllLines(path,StandardCharsets.UTF_8); for (String line : lines) { sb.append(line); } */ } catch (Exception e) { e.printStackTrace(); } JSONObject json_obj = new JSONObject(sb.toString()); return json_obj; //return sb.toString(); } public Iterator call(String s) { JSONObject extracted_feature_record = readJSONFile(s); String id = extracted_feature_record.getString("id"); JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); JSONObject ef_features = extracted_feature_record.getJSONObject("features"); int ef_page_count = ef_features.getInt("pageCount"); JSONArray ef_pages = ef_features.getJSONArray("pages"); int ef_num_pages = ef_pages.length(); ArrayList ids = new ArrayList(ef_num_pages); for (int i = 0; i < ef_page_count; i++) { ids.add(id + "." + i); } /* for (int i = 0; i < ef_num_pages; i++) { //String post_id = ef_pages.getJSONObject(i).getString("post_id"); //...... } */ //String pageName = json_obj.getJSONObject("pageInfo").getString("pageName"); /* JSONArray arr = obj.getJSONArray("posts"); for (int i = 0; i < arr.length(); i++) { String post_id = arr.getJSONObject(i).getString("post_id"); ...... } */ ids.add(id); return ids.iterator(); } }