Changeset 30937


Ignore:
Timestamp:
2016-10-26T13:44:38+13:00 (8 years ago)
Author:
davidb
Message:

Expanded set of ClusterFileIO methods

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30933 r30937  
    11package org.hathitrust;
    22
    3 import java.io.BufferedInputStream;
    43import java.io.BufferedReader;
    5 import java.io.FileInputStream;
    6 import java.io.IOException;
    7 import java.io.InputStreamReader;
    8 import java.net.URI;
    94import java.util.ArrayList;
    105import java.util.Iterator;
    116
    12 import org.apache.commons.compress.compressors.CompressorException;
    13 import org.apache.commons.compress.compressors.CompressorInputStream;
    14 import org.apache.commons.compress.compressors.CompressorStreamFactory;
    15 import org.apache.hadoop.conf.Configuration;
    16 import org.apache.hadoop.fs.FSDataInputStream;
    17 import org.apache.hadoop.fs.FileSystem;
    18 import org.apache.hadoop.fs.Path;
    197import org.apache.spark.api.java.function.FlatMapFunction;
    208import org.json.JSONArray;
     
    4230    }
    4331   
    44     protected static BufferedInputStream getBufferedInputStream(String fileIn)
    45             throws IOException
    46     {
    47         BufferedInputStream bis = null;
    48        
    49         if (fileIn.startsWith("hdfs://")) {
    50             URI uri = URI.create (fileIn);
    51             Configuration conf = new Configuration();
    52             FileSystem file = FileSystem.get(uri, conf);
    53             FSDataInputStream fin = file.open(new Path(uri));
    54 
    55             bis = new BufferedInputStream(fin);
    56         }
    57         else {
    58             // Trim 'file://' off the front
    59            
    60             String local_file_in = fileIn;
    61             if (local_file_in.startsWith("file://")) {
    62                 local_file_in = fileIn.substring("file://".length());
    63             }
    64             FileInputStream fin = new FileInputStream(local_file_in);
    65             bis = new BufferedInputStream(fin);
    66         }
    67 
    68         return bis;
    69        
    70     }
    71     protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
    72             throws IOException, CompressorException
    73     {
    74         BufferedInputStream bis = getBufferedInputStream(fileIn);
    75         CompressorInputStream comp_input = new CompressorStreamFactory().createCompressorInputStream(bis);
    76         BufferedReader br = new BufferedReader(new InputStreamReader(comp_input,"UTF8"));
    77         return br;
    78     }
    79    
    8032    protected JSONObject readJSONFile(String filename)
    8133    {
     
    8739
    8840            String str;
    89             BufferedReader br = getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
     41            BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename);
    9042            while ((str = br.readLine()) != null) {
    9143                sb.append(str);
     
    12072    }
    12173
    122     public Iterator<String> call(String s)
     74    public Iterator<String> call(String json_file_in)
    12375    {
    124         JSONObject extracted_feature_record = readJSONFile(s);
     76        JSONObject extracted_feature_record = readJSONFile(json_file_in);
     77       
     78        // Check output directory for volume exists, and create it if not
     79       
    12580       
    12681        String id = extracted_feature_record.getString("id");
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30934 r30937  
    1919    private static final long serialVersionUID = 1L;
    2020
    21     protected String _input_dir;
    22     protected String _json_list_filename;
    23     protected String _output_dir;
    24     protected int    _verbosity;
    25    
     21    protected String _input_dir;
     22    protected String _json_list_filename;
     23    protected String _output_dir;
     24   
     25    protected int    _verbosity;
     26
    2627    public PrepareForIngest(String input_dir, String json_list_filename, String output_dir, int verbosity)
    2728    {
     
    3435    public void exec()
    3536    {   
    36         SparkConf conf = new SparkConf().setAppName("HTRC-EF: Prepare for Solr Ingest");
    37         JavaSparkContext sc = new JavaSparkContext(conf);
     37        String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest";
     38        spark_app_name += "[" + _json_list_filename + "]";
    3839
    39         JavaRDD<String> json_list_data = sc.textFile(_json_list_filename).cache();
     40        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     41        JavaSparkContext jsc = new JavaSparkContext(conf);
     42        ClusterFileIO.init(_input_dir);
     43       
     44        // Check output directory exists, and create it if not
     45   
     46               
     47        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
    4048
    4149        JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir));
    42        
     50
    4351
    4452        //long numAs = json_list_data.filter(new ContainsA()).count();
    4553
    46    
    47 /*
     54
     55        /*
    4856        long numBs = json_list_data.filter(new Function<String, Boolean>() {
    4957            public Boolean call(String s) { return s.contains("b"); }
     
    5159
    5260        System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs);
    53     */
     61         */
    5462        long num_ids = json_ids.count();
    5563        System.out.println("");
     
    5866        System.out.println("############");
    5967        System.out.println("");
    60        
    61         sc.close();
     68
     69        jsc.close();
    6270    }
    6371
     
    9098        catch (ParseException e) {
    9199            System.err.println(e.getMessage());
    92             //System.err.println("Usage: RUN.bat [options] json-file-list.txt input-dir output-dir");
    93             formatter.printHelp("RUN.bash/RUN.bat [options] json-file-list.txt input-dir output-dir", options);
    94             //System.err.println("  Where 'filename.txt' contains a list of JSON files, one per line,");
    95             //System.err.println("  which use the HathiTrust Extracted Feature JSON format");
    96 
     100            formatter.printHelp("RUN.bash [options] json-file-list.txt input-dir output-dir", options);
    97101            System.exit(1);
    98102            return;
     
    110114
    111115
     116        String[] filtered_args = cmd.getArgs();
    112117
    113         String[] filtered_args = cmd.getArgs();
    114        
    115118        if (filtered_args.length != 3) {
    116                 //System.err.println("Usage: RUN.bat [options] json-filelist.txt input-dir output-dir");
    117                 formatter.printHelp("RUN.bash/RUN.bat  [options] json-filelist.txt input-dir output-dir", options);
    118 
    119             //System.err.println("Usage: RUN.bat [options] input-dir output-dir");
    120             //System.err.println("  Where 'filename.txt' contains a list of JSON files, one per line,");
    121             //System.err.println("  which use the HathiTrust Extracted Feature JSON format");
    122                 System.exit(1);
     119            formatter.printHelp("RUN.bash [options] json-filelist.txt input-dir output-dir", options);
     120            System.exit(1);
    123121        }
    124122        String json_list_filename = filtered_args[0];
    125123        String input_dir  = filtered_args[1];
    126124        String output_dir = filtered_args[2];
    127        
     125
    128126
    129127        //String json_list_filename = cmd.getArgs()[0]; // args[0];
Note: See TracChangeset for help on using the changeset viewer.