source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java@ 30932

Last change on this file since 30932 was 30932, checked in by davidb, 7 years ago

Support both file:// and hdfs://

  • Property svn:executable set to *
File size: 4.0 KB
Line 
1package org.hathitrust;
2
3import java.io.BufferedInputStream;
4import java.io.BufferedReader;
5import java.io.FileInputStream;
6import java.io.IOException;
7import java.io.InputStreamReader;
8import java.net.URI;
9import java.util.ArrayList;
10import java.util.Iterator;
11
12import org.apache.commons.compress.compressors.CompressorException;
13import org.apache.commons.compress.compressors.CompressorInputStream;
14import org.apache.commons.compress.compressors.CompressorStreamFactory;
15import org.apache.hadoop.conf.Configuration;
16import org.apache.hadoop.fs.FSDataInputStream;
17import org.apache.hadoop.fs.FileSystem;
18import org.apache.hadoop.fs.Path;
19import org.apache.spark.api.java.function.FlatMapFunction;
20import org.json.JSONArray;
21import org.json.JSONObject;
22
23/*
24class PagedJSON implements Function<String, Boolean> {
25
26 private static final long serialVersionUID = 1L;
27
28 public Boolean call(String s) { return s.contains("a"); }
29}
30 */
31
32
33class PagedJSON implements FlatMapFunction<String, String>
34{
35 private static final long serialVersionUID = 1L;
36
37 protected String _input_dir;
38
39 public PagedJSON(String input_dir)
40 {
41 _input_dir = input_dir;
42 }
43
44 protected static BufferedInputStream getBufferedInputStream(String fileIn)
45 throws IOException
46 {
47 BufferedInputStream bis = null;
48
49 if (fileIn.startsWith("hdfs://")) {
50 URI uri = URI.create (fileIn);
51 Configuration conf = new Configuration();
52 FileSystem file = FileSystem.get(uri, conf);
53 FSDataInputStream fin = file.open(new Path(uri));
54
55 bis = new BufferedInputStream(fin);
56 }
57 else {
58 FileInputStream fin = new FileInputStream(fileIn);
59 bis = new BufferedInputStream(fin);
60 }
61
62 return bis;
63
64 }
65 protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
66 throws IOException, CompressorException
67 {
68 BufferedInputStream bis = getBufferedInputStream(fileIn);
69 CompressorInputStream comp_input = new CompressorStreamFactory().createCompressorInputStream(bis);
70 BufferedReader br = new BufferedReader(new InputStreamReader(comp_input,"UTF8"));
71 return br;
72 }
73
74 protected JSONObject readJSONFile(String filename)
75 {
76 //Path path = Paths.get(filename);
77
78 StringBuilder sb = new StringBuilder();
79
80 try {
81
82 String str;
83 BufferedReader br = getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
84 while ((str = br.readLine()) != null) {
85 sb.append(str);
86 //System.out.println(str);
87 }
88
89 br.close();
90
91 //System.err.println("*****" + sb.toString());
92
93 /*
94 List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8);
95
96
97 for (String line : lines) {
98 sb.append(line);
99
100 }
101 */
102
103 }
104 catch (Exception e) {
105 e.printStackTrace();
106 }
107
108 JSONObject json_obj = new JSONObject(sb.toString());
109
110
111 return json_obj;
112
113 //return sb.toString();
114 }
115
116 public Iterator<String> call(String s)
117 {
118 JSONObject extracted_feature_record = readJSONFile(s);
119
120 String id = extracted_feature_record.getString("id");
121
122 JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
123 JSONObject ef_features = extracted_feature_record.getJSONObject("features");
124
125
126 int ef_page_count = ef_features.getInt("pageCount");
127
128 JSONArray ef_pages = ef_features.getJSONArray("pages");
129 int ef_num_pages = ef_pages.length();
130
131 ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
132 for (int i = 0; i < ef_page_count; i++) {
133 ids.add(id + "." + i);
134 }
135
136 /*
137 for (int i = 0; i < ef_num_pages; i++)
138 {
139 //String post_id = ef_pages.getJSONObject(i).getString("post_id");
140 //......
141 }
142 */
143 //String pageName = json_obj.getJSONObject("pageInfo").getString("pageName");
144/*
145 JSONArray arr = obj.getJSONArray("posts");
146 for (int i = 0; i < arr.length(); i++)
147 {
148 String post_id = arr.getJSONObject(i).getString("post_id");
149 ......
150 }
151*/
152
153
154 ids.add(id);
155
156 return ids.iterator();
157 }
158}
159
Note: See TracBrowser for help on using the repository browser.