package org.hathitrust; import java.io.Serializable; import org.apache.commons.cli.*; import org.apache.spark.api.java.*; import org.apache.spark.util.DoubleAccumulator; import org.apache.spark.SparkConf; public class PrepareForIngest implements Serializable { private static final long serialVersionUID = 1L; public static final int NUM_PARTITIONS = 6; // default would appear to be 2 protected String _input_dir; protected String _json_list_filename; protected String _solr_url; protected String _output_dir; protected int _verbosity; public PrepareForIngest(String input_dir, String json_list_filename, String solr_url, String output_dir, int verbosity) { _input_dir = input_dir; _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir; _solr_url = solr_url; _output_dir = output_dir; _verbosity = verbosity; } public void exec() { String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest"; spark_app_name += " [" + _json_list_filename + "]"; SparkConf conf = new SparkConf().setAppName(spark_app_name); JavaSparkContext jsc = new JavaSparkContext(conf); if (_verbosity >= 2) { System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); System.out.println("Default Parallelism: " + jsc.defaultParallelism()); } JavaRDD json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); long num_volumes = json_list_data.count(); double per_vol = 100.0/(double)num_volumes; DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); //JavaRDD json_ids = json_list_data.flatMap(paged_json).cache(); json_list_data.foreach(paged_json); /* System.out.println(""); System.out.println("############"); System.out.println("# Progress Accumulator: " + progress_accum.value()); System.out.println("############"); System.out.println(""); */ //long num_ids = json_ids.count(); long num_ids = num_volumes; System.out.println(""); System.out.println("############"); System.out.println("# Number of page ids: " + num_ids); System.out.println("############"); System.out.println(""); /* if (_output_dir != null) { String rdd_save_file = "rdd-solr-json-page-files"; json_ids.saveAsTextFile(rdd_save_file); System.out.println("############"); System.out.println("# Saved RDD of Solr JSON page files, top-level, as:"); System.out.println("# " + rdd_save_file); System.out.println("############"); System.out.println(""); } */ jsc.close(); } public static void print_usage(HelpFormatter formatter, Options options) { formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options); } public static void main(String[] args) { Options options = new Options(); Option verbosity_opt = new Option("v", "verbosity", true, "Set to control the level of debugging output [0=none, 1=some, 2=lots]"); verbosity_opt.setRequired(false); options.addOption(verbosity_opt); Option output_dir_opt = new Option("o", "output-dir", true, "If specified, save BZipped Solr JSON files to this directory"); output_dir_opt.setRequired(false); options.addOption(output_dir_opt); Option solr_url_opt = new Option("u", "solr-url", true, "If specified, the URL to post the Solr JSON data to"); solr_url_opt.setRequired(false); options.addOption(solr_url_opt); Option read_only_opt = new Option("r", "read-only", false, "Used to initiate a run where the files are all read in, but nothing is ingested/saved"); read_only_opt.setRequired(false); options.addOption(read_only_opt); // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark CommandLineParser parser = new GnuParser(); //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above HelpFormatter formatter = new HelpFormatter(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (ParseException e) { System.err.println(e.getMessage()); print_usage(formatter,options); System.exit(1); } String verbosity_str = cmd.getOptionValue("verbosity","0"); int verbosity = Integer.parseInt(verbosity_str); String output_dir = cmd.getOptionValue("output-dir",null); String solr_url = cmd.getOptionValue("solr-url",null); boolean dry_run = cmd.hasOption("dry-run"); String[] filtered_args = cmd.getArgs(); if (filtered_args.length != 2) { print_usage(formatter,options); System.exit(1); } if (!dry_run && ((output_dir == null) && (solr_url==null))) { System.err.println("Need to specify either --solr-url or --output-dir otherwise generated files are not ingested/saved"); print_usage(formatter,options); System.exit(1); } String input_dir = filtered_args[0]; String json_list_filename = filtered_args[1]; PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity); prep_for_ingest.exec(); } }