Changeset 30945


Ignore:
Timestamp:
2016-10-26T15:37:24+13:00 (5 years ago)
Author:
davidb
Message:

Getting closer to writing out JSON files

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/ClusterFileIO.java

    r30941 r30945  
    2121import org.apache.hadoop.fs.FileSystem;
    2222import org.apache.hadoop.fs.Path;
    23 import org.apache.spark.api.java.JavaSparkContext;
    2423
    2524public class ClusterFileIO {
     
    5857    */
    5958   
    60     public static FileSystem getFileSystemInstance(String input_dir)
     59    protected static FileSystem getFileSystemInstance(String input_dir)
    6160    {
    6261        if (__fs == null) {
     
    8281    }
    8382   
    84     protected static boolean exists(String file)
     83    public static boolean exists(String file)
    8584    {
    8685        FileSystem fs = getFileSystemInstance(file);
     
    10099        return exists;
    101100    }
    102     protected static BufferedInputStream getBufferedInputStream(String fileIn)
     101   
     102    public static String removeSuffix(String file,String suffix)
     103    {
     104        return file.substring(0,file.length() - suffix.length());
     105    }
     106   
     107    public static boolean createDirectoryAll(String dir)
     108    {
     109        FileSystem fs = getFileSystemInstance(dir);
     110        boolean created_dir = false;
     111
     112        if (!exists(dir)) {
     113            try {
     114                URI uri = new URI(dir);
     115                Path path = new Path(uri);
     116                fs.mkdirs(path);
     117                created_dir = true;
     118            } catch (URISyntaxException e) {
     119                e.printStackTrace();
     120            } catch (IOException e) {
     121                e.printStackTrace();
     122            }
     123        }
     124       
     125        return created_dir;
     126    }
     127   
     128    public static BufferedInputStream getBufferedInputStream(String fileIn)
    103129            throws IOException
    104130    {
     
    137163    }
    138164
    139     protected static BufferedOutputStream getBufferedOutputStream(String fileOut)
     165    public static BufferedOutputStream getBufferedOutputStream(String fileOut)
    140166            throws IOException
    141167    {
     
    163189    }
    164190   
    165     protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
     191    public static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
    166192            throws IOException, CompressorException
    167193    {
     
    172198    }
    173199
    174     protected static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)
     200    public static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)
    175201            throws IOException, CompressorException
    176202    {
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30942 r30945  
    2424
    2525    protected String _input_dir;
     26    protected String _output_dir;
    2627    protected int    _verbosity;
    2728   
    28     public PagedJSON(String input_dir, int verbosity)
     29    public PagedJSON(String input_dir, String output_dir, int verbosity)
    2930    {
    30         _input_dir = input_dir;
    31         _verbosity = verbosity;
     31        _input_dir  = input_dir;
     32        _output_dir = output_dir;
     33        _verbosity  = verbosity;
    3234    }
    3335   
     
    4446            while ((str = br.readLine()) != null) {
    4547                sb.append(str);
    46                 //System.out.println(str);
    4748            }
    4849
    4950            br.close();
    50            
    51             //System.err.println("*****" + sb.toString());
    52                    
    53             /*
    54             List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8);
    55 
    56 
    57             for (String line : lines) {
    58                 sb.append(line);
    59 
    60             }
    61             */
    62            
    6351        }
    6452        catch (Exception e) {
     
    6856        JSONObject json_obj = new JSONObject(sb.toString());
    6957       
    70        
    7158        return json_obj;
    72        
    73         //return sb.toString();
    7459    }
    7560
     
    8368        String id = extracted_feature_record.getString("id");
    8469       
    85         JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
     70        //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");
    8671        JSONObject ef_features = extracted_feature_record.getJSONObject("features");
    8772       
     
    9782        int ef_num_pages = ef_pages.length();
    9883       
     84        // Make directory for page-level JSON output
     85        String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2");
     86        String page_json_dir = json_dir + "/pages";
     87        //ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir);
     88        System.out.println("mkdir: " + _output_dir + "/" + page_json_dir);
     89       
    9990        ArrayList<String> ids = new ArrayList<String>(ef_num_pages);
    10091        for (int i = 0; i < ef_page_count; i++) {
    101             ids.add(id + "." + i);
     92            String formatted_i = String.format("page-%06d", i);
     93            String page_id = id + "." + formatted_i;
     94       
     95            if (_verbosity >= 2) {
     96              System.out.println("  Page: " + page_id);
     97            }
     98           
     99            // create JSON obj of just the page (for now)
     100            // write it out
     101           
     102            ids.add(page_json_dir +"/" + page_id + ".json.bz2");
     103           
     104            if (i==0) {
     105                System.out.println("Sample output JSON page file: " + page_json_dir +"/" + page_id + ".json.bz2");
     106            }
    102107        }
    103108       
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30944 r30945  
    11package org.hathitrust;
    22
    3 import java.io.IOException;
    43import java.io.Serializable;
    5 import java.nio.charset.StandardCharsets;
    6 import java.nio.file.Files;
    7 import java.nio.file.Path;
    8 import java.nio.file.Paths;
    9 import java.util.List;
    10 
    114import org.apache.commons.cli.*;
    125
    136import org.apache.spark.api.java.*;
    147import org.apache.spark.SparkConf;
    15 import org.apache.spark.api.java.function.Function;
    168
    179public class PrepareForIngest implements Serializable
     
    1911    private static final long serialVersionUID = 1L;
    2012
     13    public static final int NUM_PARTITIONS = 6; // default would appear to be 2
     14   
    2115    protected String _input_dir;
    2216    protected String _json_list_filename;
     
    4034        SparkConf conf = new SparkConf().setAppName(spark_app_name);
    4135        JavaSparkContext jsc = new JavaSparkContext(conf);
    42         //ClusterFileIO.init(_input_dir);
    4336       
    44         // Check output directory exists, and create it if not
    45        
    46        
    47         if (_verbosity >= 1) {
     37        if (_verbosity >= 2) {
    4838            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    4939            System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    5040        }
    5141               
    52         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,6).cache();
     42        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
    5343
    54         JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir,_verbosity));
     44        PagedJSON paged_json = new PagedJSON(_input_dir,_output_dir,_verbosity);
     45        JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
    5546
     47        json_ids.saveAsTextFile("foo");
    5648
    57         //long numAs = json_list_data.filter(new ContainsA()).count();
    58 
    59 
    60         /*
    61         long numBs = json_list_data.filter(new Function<String, Boolean>() {
    62             public Boolean call(String s) { return s.contains("b"); }
    63         }).count();
    64 
    65         System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs);
    66          */
    6749        long num_ids = json_ids.count();
    6850        System.out.println("");
     
    8163
    8264        //.withType(Integer.class)
    83 
     65/*
    8466        options.addOption(OptionBuilder.withLongOpt("verbosity")
    8567                .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]")
     
    8870                .isRequired(false)
    8971                .create());
    90 
     72*/
    9173        //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use");
    9274        //num_cores_opt.setRequired(false);
    9375        //options.addOption(num_cores_opt);
    9476
     77        Option verbosity_opt = new Option("v", "verbosity", true,
     78                "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
     79        verbosity_opt.setRequired(false);
     80        options.addOption(verbosity_opt);
     81       
    9582        //CommandLineParser parser = new DefaultParser(); // 1.3 and above
    96         CommandLineParser parser = new GnuParser();
     83       
     84        // need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
     85        CommandLineParser parser = new GnuParser();
    9786        HelpFormatter formatter = new HelpFormatter();
    9887        CommandLine cmd;
     
    112101
    113102        //cmd.hasOption("json-filelist")
     103       
    114104        String verbosity_str = cmd.getOptionValue("verbosity","0");
    115105        int verbosity = Integer.parseInt(verbosity_str);
    116 
    117         //System.out.println(inputFilePath);
    118         //System.out.println(outputFilePath);
    119 
    120106
    121107        String[] filtered_args = cmd.getArgs();
     
    129115        String output_dir = filtered_args[2];
    130116
    131 
    132         //String json_list_filename = cmd.getArgs()[0]; // args[0];
    133         //String json_list_filename = args[0];
    134         //int num_cores = 2;
    135 
    136117        PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,output_dir,verbosity);
    137118        prep_for_ingest.exec();
Note: See TracChangeset for help on using the changeset viewer.