package org.hathitrust; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import org.apache.commons.cli.*; import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.Function; public class PrepareForIngest implements Serializable { private static final long serialVersionUID = 1L; protected String _input_dir; protected String _json_list_filename; protected String _output_dir; protected int _verbosity; public PrepareForIngest(String input_dir, String json_list_filename, String output_dir, int verbosity) { _input_dir = input_dir; _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir; _output_dir = output_dir; _verbosity = verbosity; } public void exec() { SparkConf conf = new SparkConf().setAppName("HTRC-EF: Prepare for Solr Ingest"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD json_list_data = sc.textFile(_json_list_filename).cache(); JavaRDD json_ids = json_list_data.flatMap(new PagedJSON(_input_dir)); //long numAs = json_list_data.filter(new ContainsA()).count(); /* long numBs = json_list_data.filter(new Function() { public Boolean call(String s) { return s.contains("b"); } }).count(); System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs); */ long num_ids = json_ids.count(); System.out.println(""); System.out.println("############"); System.out.println("# number of IDS: " + num_ids); System.out.println("############"); System.out.println(""); sc.close(); } public static void main(String[] args) { Options options = new Options(); //.withType(Integer.class) options.addOption(OptionBuilder.withLongOpt("verbosity") .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]") .hasArg() .withArgName("v") .isRequired(false) .create()); //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use"); //num_cores_opt.setRequired(false); //options.addOption(num_cores_opt); //CommandLineParser parser = new DefaultParser(); // 1.3 and above CommandLineParser parser = new GnuParser(); HelpFormatter formatter = new HelpFormatter(); CommandLine cmd; try { cmd = parser.parse(options, args); } catch (ParseException e) { System.err.println(e.getMessage()); //System.err.println("Usage: RUN.bat [options] json-file-list.txt input-dir output-dir"); formatter.printHelp("RUN.bash/RUN.bat [options] json-file-list.txt input-dir output-dir", options); //System.err.println(" Where 'filename.txt' contains a list of JSON files, one per line,"); //System.err.println(" which use the HathiTrust Extracted Feature JSON format"); System.exit(1); return; } //value = ((Integer)cmdLine.getParsedOptionValue("num-cores")).intValue(); //value = ((Integer)cmdLine.getOptionValue("num-cores","2")).intValue(); //cmd.hasOption("json-filelist") String verbosity_str = cmd.getOptionValue("verbosity","0"); int verbosity = Integer.parseInt(verbosity_str); //System.out.println(inputFilePath); //System.out.println(outputFilePath); String[] filtered_args = cmd.getArgs(); if (filtered_args.length != 3) { //System.err.println("Usage: RUN.bat [options] json-filelist.txt input-dir output-dir"); formatter.printHelp("RUN.bash/RUN.bat [options] json-filelist.txt input-dir output-dir", options); //System.err.println("Usage: RUN.bat [options] input-dir output-dir"); //System.err.println(" Where 'filename.txt' contains a list of JSON files, one per line,"); //System.err.println(" which use the HathiTrust Extracted Feature JSON format"); System.exit(1); } String json_list_filename = filtered_args[0]; String input_dir = filtered_args[1]; String output_dir = filtered_args[2]; //String json_list_filename = cmd.getArgs()[0]; // args[0]; //String json_list_filename = args[0]; //int num_cores = 2; PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,output_dir,verbosity); prep_for_ingest.exec(); } }