package org.hathitrust; import java.io.Serializable; import org.apache.commons.cli.*; import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; public class PrepareForIngest implements Serializable { private static final long serialVersionUID = 1L; public static final int NUM_PARTITIONS = 6; // default would appear to be 2 protected String _input_dir; protected String _json_list_filename; protected String _solr_url; protected String _output_dir; protected int _verbosity; public PrepareForIngest(String input_dir, String json_list_filename, String solr_url, String output_dir, int verbosity) { _input_dir = input_dir; _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir; _output_dir = output_dir; _verbosity = verbosity; } public void exec() { String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest"; spark_app_name += " [" + _json_list_filename + "]"; SparkConf conf = new SparkConf().setAppName(spark_app_name); JavaSparkContext jsc = new JavaSparkContext(conf); if (_verbosity >= 2) { System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); System.out.println("Default Parallelism: " + jsc.defaultParallelism()); } JavaRDD json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url, _output_dir,_verbosity); JavaRDD json_ids = json_list_data.flatMap(paged_json).cache(); long num_ids = json_ids.count(); System.out.println(""); System.out.println("############"); System.out.println("# Number of page ids: " + num_ids); System.out.println("############"); System.out.println(""); String rdd_save_file = "rdd-solr-json-page-files"; json_ids.saveAsTextFile(rdd_save_file); System.out.println("############"); System.out.println("# Saved RDD of Solr JSON page files, top-level, as:"); System.out.println("# " + rdd_save_file); System.out.println("############"); System.out.println(""); jsc.close(); } public static void print_usage(HelpFormatter formatter, Options options) { formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options); } public static void main(String[] args) { Options options = new Options(); //.withType(Integer.class) /* options.addOption(OptionBuilder.withLongOpt("verbosity") .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]") .hasArg() .withArgName("v") .isRequired(false) .create()); */ //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use"); //num_cores_opt.setRequired(false); //options.addOption(num_cores_opt); Option verbosity_opt = new Option("v", "verbosity", true, "Set to control the level of debugging output [0=none, 1=some, 2=lots]"); verbosity_opt.setRequired(false); options.addOption(verbosity_opt); Option output_dir_opt = new Option("o", "output-dir", true, "If specified, save BZipped Solr JSON files to this directory"); output_dir_opt.setRequired(false); options.addOption(output_dir_opt); Option solr_url_opt = new Option("u", "solr-url", true, "If specified, the URL to post the Solr JSON data to"); solr_url_opt.setRequired(false); options.addOption(solr_url_opt); Option dry_run_opt = new Option("r", "dry-run", false, "Used to initiate a 'dry-run' where the files are all read in, but nothing is ingested/saved"); dry_run_opt.setRequired(false); options.addOption(dry_run_opt); // need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark CommandLineParser parser = new GnuParser(); //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above HelpFormatter formatter = new HelpFormatter(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (ParseException e) { System.err.println(e.getMessage()); print_usage(formatter,options); System.exit(1); //return; // prevents 'cmd may not be assigned' compiler error in Eclipse } //value = ((Integer)cmdLine.getParsedOptionValue("num-cores")).intValue(); //value = ((Integer)cmdLine.getOptionValue("num-cores","2")).intValue(); //cmd.hasOption("json-filelist") String verbosity_str = cmd.getOptionValue("verbosity","0"); int verbosity = Integer.parseInt(verbosity_str); String output_dir = cmd.getOptionValue("output-dir",null); String solr_url = cmd.getOptionValue("solr-url",null); boolean dry_run = cmd.hasOption("dry-run"); String[] filtered_args = cmd.getArgs(); if (filtered_args.length != 2) { print_usage(formatter,options); System.exit(1); } if (!dry_run && ((output_dir == null) && (solr_url==null))) { System.err.println("Need to specify either --solr-url or --output-dir otherwise generated files are not ingested/saved"); print_usage(formatter,options); } String json_list_filename = filtered_args[0]; String input_dir = filtered_args[1]; //String output_dir = filtered_args[2]; PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity); prep_for_ingest.exec(); } }