Show
Ignore:
Timestamp:
26.10.2016 13:44:38 (3 years ago)
Author:
davidb
Message:

Expanded set of ClusterFileIO methods

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30933 r30937  
    11package org.hathitrust; 
    22 
    3 import java.io.BufferedInputStream; 
    43import java.io.BufferedReader; 
    5 import java.io.FileInputStream; 
    6 import java.io.IOException; 
    7 import java.io.InputStreamReader; 
    8 import java.net.URI; 
    94import java.util.ArrayList; 
    105import java.util.Iterator; 
    116 
    12 import org.apache.commons.compress.compressors.CompressorException; 
    13 import org.apache.commons.compress.compressors.CompressorInputStream; 
    14 import org.apache.commons.compress.compressors.CompressorStreamFactory; 
    15 import org.apache.hadoop.conf.Configuration; 
    16 import org.apache.hadoop.fs.FSDataInputStream; 
    17 import org.apache.hadoop.fs.FileSystem; 
    18 import org.apache.hadoop.fs.Path; 
    197import org.apache.spark.api.java.function.FlatMapFunction; 
    208import org.json.JSONArray; 
     
    4230    } 
    4331     
    44     protected static BufferedInputStream getBufferedInputStream(String fileIn)  
    45             throws IOException  
    46     { 
    47         BufferedInputStream bis = null; 
    48          
    49         if (fileIn.startsWith("hdfs://")) { 
    50             URI uri = URI.create (fileIn); 
    51             Configuration conf = new Configuration(); 
    52             FileSystem file = FileSystem.get(uri, conf); 
    53             FSDataInputStream fin = file.open(new Path(uri)); 
    54  
    55             bis = new BufferedInputStream(fin); 
    56         } 
    57         else { 
    58             // Trim 'file://' off the front 
    59              
    60             String local_file_in = fileIn; 
    61             if (local_file_in.startsWith("file://")) { 
    62                 local_file_in = fileIn.substring("file://".length()); 
    63             } 
    64             FileInputStream fin = new FileInputStream(local_file_in); 
    65             bis = new BufferedInputStream(fin); 
    66         } 
    67  
    68         return bis; 
    69          
    70     } 
    71     protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)  
    72             throws IOException, CompressorException  
    73     { 
    74         BufferedInputStream bis = getBufferedInputStream(fileIn); 
    75         CompressorInputStream comp_input = new CompressorStreamFactory().createCompressorInputStream(bis); 
    76         BufferedReader br = new BufferedReader(new InputStreamReader(comp_input,"UTF8")); 
    77         return br; 
    78     } 
    79      
    8032    protected JSONObject readJSONFile(String filename) 
    8133    { 
     
    8739 
    8840            String str; 
    89             BufferedReader br = getBufferedReaderForCompressedFile(_input_dir + "/" + filename); 
     41            BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename); 
    9042            while ((str = br.readLine()) != null) { 
    9143                sb.append(str); 
     
    12072    } 
    12173 
    122     public Iterator<String> call(String s)  
     74    public Iterator<String> call(String json_file_in)  
    12375    {  
    124         JSONObject extracted_feature_record = readJSONFile(s); 
     76        JSONObject extracted_feature_record = readJSONFile(json_file_in); 
     77         
     78        // Check output directory for volume exists, and create it if not 
     79         
    12580         
    12681        String id = extracted_feature_record.getString("id"); 
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30934 r30937  
    1919    private static final long serialVersionUID = 1L; 
    2020 
    21     protected String _input_dir; 
    22     protected String _json_list_filename; 
    23     protected String _output_dir; 
    24     protected int    _verbosity; 
    25      
     21    protected String _input_dir; 
     22    protected String _json_list_filename; 
     23    protected String _output_dir; 
     24     
     25    protected int    _verbosity; 
     26 
    2627    public PrepareForIngest(String input_dir, String json_list_filename, String output_dir, int verbosity) 
    2728    { 
     
    3435    public void exec() 
    3536    {    
    36         SparkConf conf = new SparkConf().setAppName("HTRC-EF: Prepare for Solr Ingest"); 
    37         JavaSparkContext sc = new JavaSparkContext(conf); 
     37        String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest"; 
     38        spark_app_name += "[" + _json_list_filename + "]"; 
    3839 
    39         JavaRDD<String> json_list_data = sc.textFile(_json_list_filename).cache(); 
     40        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
     41        JavaSparkContext jsc = new JavaSparkContext(conf); 
     42        ClusterFileIO.init(_input_dir); 
     43         
     44        // Check output directory exists, and create it if not 
     45     
     46                 
     47        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 
    4048 
    4149        JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir)); 
    42          
     50 
    4351 
    4452        //long numAs = json_list_data.filter(new ContainsA()).count(); 
    4553 
    46      
    47 /* 
     54 
     55        /* 
    4856        long numBs = json_list_data.filter(new Function<String, Boolean>() { 
    4957            public Boolean call(String s) { return s.contains("b"); } 
     
    5159 
    5260        System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs); 
    53     */ 
     61         */ 
    5462        long num_ids = json_ids.count(); 
    5563        System.out.println(""); 
     
    5866        System.out.println("############"); 
    5967        System.out.println(""); 
    60          
    61         sc.close(); 
     68 
     69        jsc.close(); 
    6270    } 
    6371 
     
    9098        catch (ParseException e) { 
    9199            System.err.println(e.getMessage()); 
    92             //System.err.println("Usage: RUN.bat [options] json-file-list.txt input-dir output-dir"); 
    93             formatter.printHelp("RUN.bash/RUN.bat [options] json-file-list.txt input-dir output-dir", options); 
    94             //System.err.println("  Where 'filename.txt' contains a list of JSON files, one per line,"); 
    95             //System.err.println("  which use the HathiTrust Extracted Feature JSON format"); 
    96  
     100            formatter.printHelp("RUN.bash [options] json-file-list.txt input-dir output-dir", options); 
    97101            System.exit(1); 
    98102            return; 
     
    110114 
    111115 
     116        String[] filtered_args = cmd.getArgs(); 
    112117 
    113         String[] filtered_args = cmd.getArgs(); 
    114          
    115118        if (filtered_args.length != 3) { 
    116                 //System.err.println("Usage: RUN.bat [options] json-filelist.txt input-dir output-dir"); 
    117                 formatter.printHelp("RUN.bash/RUN.bat  [options] json-filelist.txt input-dir output-dir", options); 
    118  
    119             //System.err.println("Usage: RUN.bat [options] input-dir output-dir"); 
    120             //System.err.println("  Where 'filename.txt' contains a list of JSON files, one per line,"); 
    121             //System.err.println("  which use the HathiTrust Extracted Feature JSON format"); 
    122                 System.exit(1); 
     119            formatter.printHelp("RUN.bash [options] json-filelist.txt input-dir output-dir", options); 
     120            System.exit(1); 
    123121        } 
    124122        String json_list_filename = filtered_args[0]; 
    125123        String input_dir  = filtered_args[1]; 
    126124        String output_dir = filtered_args[2]; 
    127          
     125 
    128126 
    129127        //String json_list_filename = cmd.getArgs()[0]; // args[0];