Ignore:
Timestamp:
2016-10-26T15:37:24+13:00 (8 years ago)
Author:
davidb
Message:

Getting closer to writing out JSON files

File:
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30944 r30945  
    11package org.hathitrust;
    22
    3 import java.io.IOException;
    43import java.io.Serializable;
    5 import java.nio.charset.StandardCharsets;
    6 import java.nio.file.Files;
    7 import java.nio.file.Path;
    8 import java.nio.file.Paths;
    9 import java.util.List;
    10 
    114import org.apache.commons.cli.*;
    125
    136import org.apache.spark.api.java.*;
    147import org.apache.spark.SparkConf;
    15 import org.apache.spark.api.java.function.Function;
    168
    179public class PrepareForIngest implements Serializable
     
    1911    private static final long serialVersionUID = 1L;
    2012
     13    public static final int NUM_PARTITIONS = 6; // default would appear to be 2
     14   
    2115    protected String _input_dir;
    2216    protected String _json_list_filename;
     
    4034        SparkConf conf = new SparkConf().setAppName(spark_app_name);
    4135        JavaSparkContext jsc = new JavaSparkContext(conf);
    42         //ClusterFileIO.init(_input_dir);
    4336       
    44         // Check output directory exists, and create it if not
    45        
    46        
    47         if (_verbosity >= 1) {
     37        if (_verbosity >= 2) {
    4838            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    4939            System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    5040        }
    5141               
    52         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,6).cache();
     42        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
    5343
    54         JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir,_verbosity));
     44        PagedJSON paged_json = new PagedJSON(_input_dir,_output_dir,_verbosity);
     45        JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
    5546
     47        json_ids.saveAsTextFile("foo");
    5648
    57         //long numAs = json_list_data.filter(new ContainsA()).count();
    58 
    59 
    60         /*
    61         long numBs = json_list_data.filter(new Function<String, Boolean>() {
    62             public Boolean call(String s) { return s.contains("b"); }
    63         }).count();
    64 
    65         System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs);
    66          */
    6749        long num_ids = json_ids.count();
    6850        System.out.println("");
     
    8163
    8264        //.withType(Integer.class)
    83 
     65/*
    8466        options.addOption(OptionBuilder.withLongOpt("verbosity")
    8567                .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]")
     
    8870                .isRequired(false)
    8971                .create());
    90 
     72*/
    9173        //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use");
    9274        //num_cores_opt.setRequired(false);
    9375        //options.addOption(num_cores_opt);
    9476
     77        Option verbosity_opt = new Option("v", "verbosity", true,
     78                "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
     79        verbosity_opt.setRequired(false);
     80        options.addOption(verbosity_opt);
     81       
    9582        //CommandLineParser parser = new DefaultParser(); // 1.3 and above
    96         CommandLineParser parser = new GnuParser();
     83       
     84        // need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
     85        CommandLineParser parser = new GnuParser();
    9786        HelpFormatter formatter = new HelpFormatter();
    9887        CommandLine cmd;
     
    112101
    113102        //cmd.hasOption("json-filelist")
     103       
    114104        String verbosity_str = cmd.getOptionValue("verbosity","0");
    115105        int verbosity = Integer.parseInt(verbosity_str);
    116 
    117         //System.out.println(inputFilePath);
    118         //System.out.println(outputFilePath);
    119 
    120106
    121107        String[] filtered_args = cmd.getArgs();
     
    129115        String output_dir = filtered_args[2];
    130116
    131 
    132         //String json_list_filename = cmd.getArgs()[0]; // args[0];
    133         //String json_list_filename = args[0];
    134         //int num_cores = 2;
    135 
    136117        PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,output_dir,verbosity);
    137118        prep_for_ingest.exec();
Note: See TracChangeset for help on using the changeset viewer.