source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java@ 30996

Last change on this file since 30996 was 30996, checked in by davidb, 7 years ago

Code refactoring

  • Property svn:executable set to *
File size: 5.8 KB
Line 
1package org.hathitrust;
2
3import java.io.Serializable;
4import org.apache.commons.cli.*;
5
6import org.apache.spark.api.java.*;
7import org.apache.spark.util.DoubleAccumulator;
8import org.hathitrust.extractedfeatures.PagedJSON;
9import org.apache.spark.SparkConf;
10
11public class PrepareForIngest implements Serializable
12{
13 private static final long serialVersionUID = 1L;
14
15 // Following details on number of partitions to use given in
16 // "Parallelized collections" section of:
17 // https://spark.apache.org/docs/2.0.1/programming-guide.html
18 //
19 // For a more detailed discussion see:
20 // http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
21
22 public static final int NUM_CORES = 6;
23 public static final int NUM_PARTITIONS = 2*NUM_CORES; // default would appear to be 2
24
25 protected String _input_dir;
26 protected String _json_list_filename;
27 protected String _solr_url;
28 protected String _output_dir;
29
30 protected int _verbosity;
31
32 public PrepareForIngest(String input_dir, String json_list_filename,
33 String solr_url, String output_dir, int verbosity)
34 {
35 _input_dir = input_dir;
36 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
37
38 _solr_url = solr_url;
39 _output_dir = output_dir;
40 _verbosity = verbosity;
41 }
42
43 public void exec()
44 {
45 String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest";
46 spark_app_name += " [" + _json_list_filename + "]";
47
48 SparkConf conf = new SparkConf().setAppName(spark_app_name);
49 JavaSparkContext jsc = new JavaSparkContext(conf);
50
51 if (_verbosity >= 2) {
52 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
53 System.out.println("Default Parallelism: " + jsc.defaultParallelism());
54 }
55
56 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
57
58 long num_volumes = json_list_data.count();
59 double per_vol = 100.0/(double)num_volumes;
60
61 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
62
63 PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
64 //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
65
66 json_list_data.foreach(paged_json);
67
68
69/*
70 System.out.println("");
71 System.out.println("############");
72 System.out.println("# Progress Accumulator: " + progress_accum.value());
73 System.out.println("############");
74 System.out.println("");
75*/
76
77 //long num_ids = json_ids.count();
78 long num_ids = num_volumes;
79
80 System.out.println("");
81 System.out.println("############");
82 System.out.println("# Number of page ids: " + num_ids);
83 System.out.println("############");
84 System.out.println("");
85
86 /*
87 if (_output_dir != null) {
88 String rdd_save_file = "rdd-solr-json-page-files";
89 json_ids.saveAsTextFile(rdd_save_file);
90 System.out.println("############");
91 System.out.println("# Saved RDD of Solr JSON page files, top-level, as:");
92 System.out.println("# " + rdd_save_file);
93 System.out.println("############");
94 System.out.println("");
95 }
96 */
97
98 jsc.close();
99 }
100
101 public static void print_usage(HelpFormatter formatter, Options options)
102 {
103 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
104 }
105 public static void main(String[] args) {
106
107
108 Options options = new Options();
109
110 Option verbosity_opt = new Option("v", "verbosity", true,
111 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
112 verbosity_opt.setRequired(false);
113 options.addOption(verbosity_opt);
114
115 Option output_dir_opt = new Option("o", "output-dir", true,
116 "If specified, save BZipped Solr JSON files to this directory");
117 output_dir_opt.setRequired(false);
118 options.addOption(output_dir_opt);
119
120 Option solr_url_opt = new Option("u", "solr-url", true,
121 "If specified, the URL to post the Solr JSON data to");
122 solr_url_opt.setRequired(false);
123 options.addOption(solr_url_opt);
124
125 Option read_only_opt = new Option("r", "read-only", false,
126 "Used to initiate a run where the files are all read in, but nothing is ingested/saved");
127 read_only_opt.setRequired(false);
128 options.addOption(read_only_opt);
129
130 // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
131 CommandLineParser parser = new GnuParser();
132 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
133
134 HelpFormatter formatter = new HelpFormatter();
135 CommandLine cmd = null;
136
137 try {
138 cmd = parser.parse(options, args);
139 }
140 catch (ParseException e) {
141 System.err.println(e.getMessage());
142 print_usage(formatter,options);
143 System.exit(1);
144 }
145
146
147 String verbosity_str = cmd.getOptionValue("verbosity","0");
148 int verbosity = Integer.parseInt(verbosity_str);
149
150 String output_dir = cmd.getOptionValue("output-dir",null);
151 String solr_url = cmd.getOptionValue("solr-url",null);
152 boolean read_only = cmd.hasOption("read-only");
153
154 String[] filtered_args = cmd.getArgs();
155
156 if (filtered_args.length != 2) {
157 print_usage(formatter,options);
158 System.exit(1);
159 }
160
161 if (!read_only && ((output_dir == null) && (solr_url==null))) {
162 System.err.println("Need to specify either --solr-url or --output-dir otherwise generated files are not ingested/saved");
163 print_usage(formatter,options);
164 System.exit(1);
165 }
166 if (read_only) {
167 // For this case, need to ensure solr-url and output-dir are null
168 output_dir = null;
169 solr_url = null;
170 }
171
172 String input_dir = filtered_args[0];
173 String json_list_filename = filtered_args[1];
174
175 PrepareForIngest prep_for_ingest
176 = new PrepareForIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
177 prep_for_ingest.exec();
178
179 }
180}
Note: See TracBrowser for help on using the repository browser.