Changeset 30937 for other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java
- Timestamp:
- 2016-10-26T13:44:38+13:00 (8 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java
r30933 r30937 1 1 package org.hathitrust; 2 2 3 import java.io.BufferedInputStream;4 3 import java.io.BufferedReader; 5 import java.io.FileInputStream;6 import java.io.IOException;7 import java.io.InputStreamReader;8 import java.net.URI;9 4 import java.util.ArrayList; 10 5 import java.util.Iterator; 11 6 12 import org.apache.commons.compress.compressors.CompressorException;13 import org.apache.commons.compress.compressors.CompressorInputStream;14 import org.apache.commons.compress.compressors.CompressorStreamFactory;15 import org.apache.hadoop.conf.Configuration;16 import org.apache.hadoop.fs.FSDataInputStream;17 import org.apache.hadoop.fs.FileSystem;18 import org.apache.hadoop.fs.Path;19 7 import org.apache.spark.api.java.function.FlatMapFunction; 20 8 import org.json.JSONArray; … … 42 30 } 43 31 44 protected static BufferedInputStream getBufferedInputStream(String fileIn)45 throws IOException46 {47 BufferedInputStream bis = null;48 49 if (fileIn.startsWith("hdfs://")) {50 URI uri = URI.create (fileIn);51 Configuration conf = new Configuration();52 FileSystem file = FileSystem.get(uri, conf);53 FSDataInputStream fin = file.open(new Path(uri));54 55 bis = new BufferedInputStream(fin);56 }57 else {58 // Trim 'file://' off the front59 60 String local_file_in = fileIn;61 if (local_file_in.startsWith("file://")) {62 local_file_in = fileIn.substring("file://".length());63 }64 FileInputStream fin = new FileInputStream(local_file_in);65 bis = new BufferedInputStream(fin);66 }67 68 return bis;69 70 }71 protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)72 throws IOException, CompressorException73 {74 BufferedInputStream bis = getBufferedInputStream(fileIn);75 CompressorInputStream comp_input = new CompressorStreamFactory().createCompressorInputStream(bis);76 BufferedReader br = new BufferedReader(new InputStreamReader(comp_input,"UTF8"));77 return br;78 }79 80 32 protected JSONObject readJSONFile(String filename) 81 33 { … … 87 39 88 40 String str; 89 BufferedReader br = getBufferedReaderForCompressedFile(_input_dir + "/" + filename);41 BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename); 90 42 while ((str = br.readLine()) != null) { 91 43 sb.append(str); … … 120 72 } 121 73 122 public Iterator<String> call(String s)74 public Iterator<String> call(String json_file_in) 123 75 { 124 JSONObject extracted_feature_record = readJSONFile(s); 76 JSONObject extracted_feature_record = readJSONFile(json_file_in); 77 78 // Check output directory for volume exists, and create it if not 79 125 80 126 81 String id = extracted_feature_record.getString("id");
Note:
See TracChangeset
for help on using the changeset viewer.