Changeset 30945 for other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java
- Timestamp:
- 2016-10-26T15:37:24+13:00 (8 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java
r30944 r30945 1 1 package org.hathitrust; 2 2 3 import java.io.IOException;4 3 import java.io.Serializable; 5 import java.nio.charset.StandardCharsets;6 import java.nio.file.Files;7 import java.nio.file.Path;8 import java.nio.file.Paths;9 import java.util.List;10 11 4 import org.apache.commons.cli.*; 12 5 13 6 import org.apache.spark.api.java.*; 14 7 import org.apache.spark.SparkConf; 15 import org.apache.spark.api.java.function.Function;16 8 17 9 public class PrepareForIngest implements Serializable … … 19 11 private static final long serialVersionUID = 1L; 20 12 13 public static final int NUM_PARTITIONS = 6; // default would appear to be 2 14 21 15 protected String _input_dir; 22 16 protected String _json_list_filename; … … 40 34 SparkConf conf = new SparkConf().setAppName(spark_app_name); 41 35 JavaSparkContext jsc = new JavaSparkContext(conf); 42 //ClusterFileIO.init(_input_dir);43 36 44 // Check output directory exists, and create it if not 45 46 47 if (_verbosity >= 1) { 37 if (_verbosity >= 2) { 48 38 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 49 39 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 50 40 } 51 41 52 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename, 6).cache();42 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 53 43 54 JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir,_verbosity)); 44 PagedJSON paged_json = new PagedJSON(_input_dir,_output_dir,_verbosity); 45 JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 55 46 47 json_ids.saveAsTextFile("foo"); 56 48 57 //long numAs = json_list_data.filter(new ContainsA()).count();58 59 60 /*61 long numBs = json_list_data.filter(new Function<String, Boolean>() {62 public Boolean call(String s) { return s.contains("b"); }63 }).count();64 65 System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs);66 */67 49 long num_ids = json_ids.count(); 68 50 System.out.println(""); … … 81 63 82 64 //.withType(Integer.class) 83 65 /* 84 66 options.addOption(OptionBuilder.withLongOpt("verbosity") 85 67 .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]") … … 88 70 .isRequired(false) 89 71 .create()); 90 72 */ 91 73 //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use"); 92 74 //num_cores_opt.setRequired(false); 93 75 //options.addOption(num_cores_opt); 94 76 77 Option verbosity_opt = new Option("v", "verbosity", true, 78 "Set to control the level of debugging output [0=none, 1=some, 2=lots]"); 79 verbosity_opt.setRequired(false); 80 options.addOption(verbosity_opt); 81 95 82 //CommandLineParser parser = new DefaultParser(); // 1.3 and above 96 CommandLineParser parser = new GnuParser(); 83 84 // need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark 85 CommandLineParser parser = new GnuParser(); 97 86 HelpFormatter formatter = new HelpFormatter(); 98 87 CommandLine cmd; … … 112 101 113 102 //cmd.hasOption("json-filelist") 103 114 104 String verbosity_str = cmd.getOptionValue("verbosity","0"); 115 105 int verbosity = Integer.parseInt(verbosity_str); 116 117 //System.out.println(inputFilePath);118 //System.out.println(outputFilePath);119 120 106 121 107 String[] filtered_args = cmd.getArgs(); … … 129 115 String output_dir = filtered_args[2]; 130 116 131 132 //String json_list_filename = cmd.getArgs()[0]; // args[0];133 //String json_list_filename = args[0];134 //int num_cores = 2;135 136 117 PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,output_dir,verbosity); 137 118 prep_for_ingest.exec();
Note:
See TracChangeset
for help on using the changeset viewer.