source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java@ 30984

Last change on this file since 30984 was 30984, checked in by davidb, 7 years ago

Introduction of Spark accumulator to measure progress. Output of POST read in and status checked for

  • Property svn:executable set to *
File size: 5.9 KB
Line 
1package org.hathitrust;
2
3import java.io.Serializable;
4import org.apache.commons.cli.*;
5
6import org.apache.spark.api.java.*;
7import org.apache.spark.util.DoubleAccumulator;
8import org.apache.spark.SparkConf;
9
10public class PrepareForIngest implements Serializable
11{
12 private static final long serialVersionUID = 1L;
13
14 public static final int NUM_PARTITIONS = 6; // default would appear to be 2
15
16 protected String _input_dir;
17 protected String _json_list_filename;
18 protected String _solr_url;
19 protected String _output_dir;
20
21 protected int _verbosity;
22
23 public PrepareForIngest(String input_dir, String json_list_filename,
24 String solr_url, String output_dir, int verbosity)
25 {
26 _input_dir = input_dir;
27 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
28
29 _solr_url = solr_url;
30 _output_dir = output_dir;
31 _verbosity = verbosity;
32 }
33
34 public void exec()
35 {
36 String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest";
37 spark_app_name += " [" + _json_list_filename + "]";
38
39 SparkConf conf = new SparkConf().setAppName(spark_app_name);
40 JavaSparkContext jsc = new JavaSparkContext(conf);
41
42 if (_verbosity >= 2) {
43 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
44 System.out.println("Default Parallelism: " + jsc.defaultParallelism());
45 }
46
47 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
48
49 long num_volumes = json_list_data.count();
50 double per_vol = 100.0/(double)num_volumes;
51
52 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("ProgressPercent");
53
54 //sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
55 // ...
56 // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
57
58 //accum.value();
59
60 PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
61 JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
62
63 long num_ids = json_ids.count();
64 System.out.println("");
65 System.out.println("############");
66 System.out.println("# Number of page ids: " + num_ids);
67 System.out.println("############");
68 System.out.println("");
69
70 if (_output_dir != null) {
71 String rdd_save_file = "rdd-solr-json-page-files";
72 json_ids.saveAsTextFile(rdd_save_file);
73 System.out.println("############");
74 System.out.println("# Saved RDD of Solr JSON page files, top-level, as:");
75 System.out.println("# " + rdd_save_file);
76 System.out.println("############");
77 System.out.println("");
78 }
79
80 jsc.close();
81 }
82
83 public static void print_usage(HelpFormatter formatter, Options options)
84 {
85 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
86 }
87 public static void main(String[] args) {
88
89
90 Options options = new Options();
91
92 //.withType(Integer.class)
93/*
94 options.addOption(OptionBuilder.withLongOpt("verbosity")
95 .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]")
96 .hasArg()
97 .withArgName("v")
98 .isRequired(false)
99 .create());
100*/
101 //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use");
102 //num_cores_opt.setRequired(false);
103 //options.addOption(num_cores_opt);
104
105 Option verbosity_opt = new Option("v", "verbosity", true,
106 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
107 verbosity_opt.setRequired(false);
108 options.addOption(verbosity_opt);
109
110 Option output_dir_opt = new Option("o", "output-dir", true,
111 "If specified, save BZipped Solr JSON files to this directory");
112 output_dir_opt.setRequired(false);
113 options.addOption(output_dir_opt);
114
115 Option solr_url_opt = new Option("u", "solr-url", true,
116 "If specified, the URL to post the Solr JSON data to");
117 solr_url_opt.setRequired(false);
118 options.addOption(solr_url_opt);
119
120 Option dry_run_opt = new Option("r", "dry-run", false,
121 "Used to initiate a 'dry-run' where the files are all read in, but nothing is ingested/saved");
122 dry_run_opt.setRequired(false);
123 options.addOption(dry_run_opt);
124
125 // need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
126 CommandLineParser parser = new GnuParser();
127 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
128
129 HelpFormatter formatter = new HelpFormatter();
130 CommandLine cmd = null;
131
132 try {
133 cmd = parser.parse(options, args);
134 }
135 catch (ParseException e) {
136 System.err.println(e.getMessage());
137 print_usage(formatter,options);
138 System.exit(1);
139 //return; // prevents 'cmd may not be assigned' compiler error in Eclipse
140 }
141
142 //value = ((Integer)cmdLine.getParsedOptionValue("num-cores")).intValue();
143 //value = ((Integer)cmdLine.getOptionValue("num-cores","2")).intValue();
144
145 //cmd.hasOption("json-filelist")
146
147 String verbosity_str = cmd.getOptionValue("verbosity","0");
148 int verbosity = Integer.parseInt(verbosity_str);
149
150 String output_dir = cmd.getOptionValue("output-dir",null);
151 String solr_url = cmd.getOptionValue("solr-url",null);
152 boolean dry_run = cmd.hasOption("dry-run");
153
154 String[] filtered_args = cmd.getArgs();
155
156 if (filtered_args.length != 2) {
157 print_usage(formatter,options);
158 System.exit(1);
159 }
160
161 if (!dry_run && ((output_dir == null) && (solr_url==null))) {
162 System.err.println("Need to specify either --solr-url or --output-dir otherwise generated files are not ingested/saved");
163 print_usage(formatter,options);
164 System.exit(1);
165 }
166
167 String input_dir = filtered_args[0];
168 String json_list_filename = filtered_args[1];
169 //String output_dir = filtered_args[2];
170
171 PrepareForIngest prep_for_ingest
172 = new PrepareForIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
173 prep_for_ingest.exec();
174
175 }
176}
Note: See TracBrowser for help on using the repository browser.