Ignore:
Timestamp:
2016-10-26T13:44:38+13:00 (8 years ago)
Author:
davidb
Message:

Expanded set of ClusterFileIO methods

File:
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30933 r30937  
    11package org.hathitrust;
    22
    3 import java.io.BufferedInputStream;
    43import java.io.BufferedReader;
    5 import java.io.FileInputStream;
    6 import java.io.IOException;
    7 import java.io.InputStreamReader;
    8 import java.net.URI;
    94import java.util.ArrayList;
    105import java.util.Iterator;
    116
    12 import org.apache.commons.compress.compressors.CompressorException;
    13 import org.apache.commons.compress.compressors.CompressorInputStream;
    14 import org.apache.commons.compress.compressors.CompressorStreamFactory;
    15 import org.apache.hadoop.conf.Configuration;
    16 import org.apache.hadoop.fs.FSDataInputStream;
    17 import org.apache.hadoop.fs.FileSystem;
    18 import org.apache.hadoop.fs.Path;
    197import org.apache.spark.api.java.function.FlatMapFunction;
    208import org.json.JSONArray;
     
    4230    }
    4331   
    44     protected static BufferedInputStream getBufferedInputStream(String fileIn)
    45             throws IOException
    46     {
    47         BufferedInputStream bis = null;
    48        
    49         if (fileIn.startsWith("hdfs://")) {
    50             URI uri = URI.create (fileIn);
    51             Configuration conf = new Configuration();
    52             FileSystem file = FileSystem.get(uri, conf);
    53             FSDataInputStream fin = file.open(new Path(uri));
    54 
    55             bis = new BufferedInputStream(fin);
    56         }
    57         else {
    58             // Trim 'file://' off the front
    59            
    60             String local_file_in = fileIn;
    61             if (local_file_in.startsWith("file://")) {
    62                 local_file_in = fileIn.substring("file://".length());
    63             }
    64             FileInputStream fin = new FileInputStream(local_file_in);
    65             bis = new BufferedInputStream(fin);
    66         }
    67 
    68         return bis;
    69        
    70     }
    71     protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
    72             throws IOException, CompressorException
    73     {
    74         BufferedInputStream bis = getBufferedInputStream(fileIn);
    75         CompressorInputStream comp_input = new CompressorStreamFactory().createCompressorInputStream(bis);
    76         BufferedReader br = new BufferedReader(new InputStreamReader(comp_input,"UTF8"));
    77         return br;
    78     }
    79    
    8032    protected JSONObject readJSONFile(String filename)
    8133    {
     
    8739
    8840            String str;
    89             BufferedReader br = getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
     41            BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
    9042            while ((str = br.readLine()) != null) {
    9143                sb.append(str);
     
    12072    }
    12173
    122     public Iterator<String> call(String s)
     74    public Iterator<String> call(String json_file_in)
    12375    {
    124         JSONObject extracted_feature_record = readJSONFile(s);
     76        JSONObject extracted_feature_record = readJSONFile(json_file_in);
     77       
     78        // Check output directory for volume exists, and create it if not
     79       
    12580       
    12681        String id = extracted_feature_record.getString("id");
Note: See TracChangeset for help on using the changeset viewer.