source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java@ 31008

Last change on this file since 31008 was 31008, checked in by davidb, 7 years ago

Additional detail added into Spark app name

  • Property svn:executable set to *
File size: 7.8 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.Serializable;
4import org.apache.commons.cli.*;
5
6import org.apache.spark.api.java.*;
7import org.apache.spark.util.DoubleAccumulator;
8import org.hathitrust.extractedfeatures.PerPageJSONFlatmap;
9import org.json.JSONObject;
10import org.apache.spark.SparkConf;
11
12public class ProcessForSolrIngest implements Serializable
13{
14 private static final long serialVersionUID = 1L;
15
16 // Following details on number of partitions to use given in
17 // "Parallelized collections" section of:
18 // https://spark.apache.org/docs/2.0.1/programming-guide.html
19 //
20 // For a more detailed discussion see:
21 // http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
22
23 public static final int NUM_CORES = 6;
24 public static final int NUM_PARTITIONS = 2*NUM_CORES; // default would appear to be 2
25
26 protected String _input_dir;
27 protected String _json_list_filename;
28 protected String _solr_url;
29 protected String _output_dir;
30
31 protected int _verbosity;
32
33 public ProcessForSolrIngest(String input_dir, String json_list_filename,
34 String solr_url, String output_dir, int verbosity)
35 {
36 _input_dir = input_dir;
37 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
38
39 _solr_url = solr_url;
40 _output_dir = output_dir;
41 _verbosity = verbosity;
42 }
43
44 public void execPerVolume()
45 {
46 String spark_app_name = "[Per Volume] Extract Features: Process for Solr Ingest";
47 spark_app_name += " [" + _json_list_filename + "]";
48
49 if (_solr_url != null) {
50 spark_app_name += "solr_url=" + _solr_url;
51 }
52
53 if (_output_dir != null) {
54 spark_app_name += "output_dir=" + _output_dir;
55 }
56
57
58 SparkConf conf = new SparkConf().setAppName(spark_app_name);
59 JavaSparkContext jsc = new JavaSparkContext(conf);
60
61 if (_verbosity >= 2) {
62 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
63 System.out.println("Default Parallelism: " + jsc.defaultParallelism());
64 }
65
66 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
67
68 long num_volumes = json_list_data.count();
69 double per_vol = 100.0/(double)num_volumes;
70
71 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
72
73 System.err.println();
74 System.err.println();
75 System.err.println();
76 System.err.println("****##### _input_dir = " + _input_dir);
77 System.err.println();
78 System.err.println();
79 System.err.println();
80
81 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
82
83 json_list_data.foreach(per_vol_json);
84
85 long num_ids = num_volumes;
86
87 System.out.println("");
88 System.out.println("############");
89 System.out.println("# Number of volume ids: " + num_ids);
90 System.out.println("############");
91 System.out.println("");
92
93 jsc.close();
94 }
95
96 public void execPerPage()
97 {
98 String spark_app_name = "[Per Page] Extract Features: Process for Solr Ingest";
99 spark_app_name += " [" + _json_list_filename + "]";
100
101 if (_solr_url != null) {
102 spark_app_name += "solr_url=" + _solr_url;
103 }
104
105 if (_output_dir != null) {
106 spark_app_name += "output_dir=" + _output_dir;
107 }
108
109 SparkConf conf = new SparkConf().setAppName(spark_app_name);
110 JavaSparkContext jsc = new JavaSparkContext(conf);
111
112 if (_verbosity >= 2) {
113 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
114 System.out.println("Default Parallelism: " + jsc.defaultParallelism());
115 }
116
117 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
118
119 long num_volumes = json_list_data.count();
120 double per_vol = 100.0/(double)num_volumes;
121
122 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
123
124 PerPageJSONFlatmap paged_json = new PerPageJSONFlatmap(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
125 JavaRDD<JSONObject> json_per_page_ids = json_list_data.flatMap(paged_json).cache();
126
127 PerPageJSONForeach paged_json_foreach = new PerPageJSONForeach(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
128 json_per_page_ids.foreach(paged_json_foreach);
129
130/*
131 System.out.println("");
132 System.out.println("############");
133 System.out.println("# Progress Accumulator: " + progress_accum.value());
134 System.out.println("############");
135 System.out.println("");
136*/
137
138 long num_page_ids = json_per_page_ids.count();
139
140 System.out.println("");
141 System.out.println("############");
142 System.out.println("# Number of page ids: " + num_page_ids);
143 System.out.println("############");
144 System.out.println("");
145
146 /*
147 if (_output_dir != null) {
148 String rdd_save_file = "rdd-solr-json-page-files";
149 json_ids.saveAsTextFile(rdd_save_file);
150 System.out.println("############");
151 System.out.println("# Saved RDD of Solr JSON page files, top-level, as:");
152 System.out.println("# " + rdd_save_file);
153 System.out.println("############");
154 System.out.println("");
155 }
156 */
157
158 jsc.close();
159 }
160
161
162
163
164 public static void print_usage(HelpFormatter formatter, Options options)
165 {
166 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
167 }
168
169 public static void main(String[] args) {
170 Options options = new Options();
171
172 Option verbosity_opt = new Option("v", "verbosity", true,
173 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
174 verbosity_opt.setRequired(false);
175 options.addOption(verbosity_opt);
176
177 Option output_dir_opt = new Option("o", "output-dir", true,
178 "If specified, save BZipped Solr JSON files to this directory");
179 output_dir_opt.setRequired(false);
180 options.addOption(output_dir_opt);
181
182 Option solr_url_opt = new Option("u", "solr-url", true,
183 "If specified, the URL to post the Solr JSON data to");
184 solr_url_opt.setRequired(false);
185 options.addOption(solr_url_opt);
186
187 Option read_only_opt = new Option("r", "read-only", false,
188 "Used to initiate a run where the files are all read in, but nothing is ingested/saved");
189 read_only_opt.setRequired(false);
190 options.addOption(read_only_opt);
191
192 // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
193 CommandLineParser parser = new GnuParser();
194 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
195
196 HelpFormatter formatter = new HelpFormatter();
197 CommandLine cmd = null;
198
199 try {
200 cmd = parser.parse(options, args);
201 }
202 catch (ParseException e) {
203 System.err.println(e.getMessage());
204 print_usage(formatter,options);
205 System.exit(1);
206 }
207
208
209 String verbosity_str = cmd.getOptionValue("verbosity","0");
210 int verbosity = Integer.parseInt(verbosity_str);
211
212 String output_dir = cmd.getOptionValue("output-dir",null);
213 String solr_url = cmd.getOptionValue("solr-url",null);
214 boolean read_only = cmd.hasOption("read-only");
215
216 String[] filtered_args = cmd.getArgs();
217
218 if (filtered_args.length != 2) {
219 print_usage(formatter,options);
220 System.exit(1);
221 }
222
223 if (!read_only && ((output_dir == null) && (solr_url==null))) {
224 System.err.println("Need to specify either --solr-url or --output-dir otherwise generated files are not ingested/saved");
225 print_usage(formatter,options);
226 System.exit(1);
227 }
228 if (read_only) {
229 // For this case, need to ensure solr-url and output-dir are null
230 output_dir = null;
231 solr_url = null;
232 }
233
234 String input_dir = filtered_args[0];
235 String json_list_filename = filtered_args[1];
236
237 ProcessForSolrIngest prep_for_ingest
238 = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
239
240 //prep_for_ingest.execPerVolume();
241 prep_for_ingest.execPerPage();
242 }
243}
Note: See TracBrowser for help on using the repository browser.