source: other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java@ 31368

Last change on this file since 31368 was 31368, checked in by davidb, 7 years ago

downsample-100 added

  • Property svn:executable set to *
File size: 10.9 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.BufferedInputStream;
4import java.io.FileInputStream;
5import java.io.FileNotFoundException;
6import java.io.IOException;
7import java.io.Serializable;
8import org.apache.commons.cli.*;
9import org.apache.hadoop.io.Text;
10import org.apache.hadoop.mapred.SequenceFileOutputFormat;
11import org.apache.spark.api.java.*;
12import org.apache.spark.api.java.function.Function2;
13import org.apache.spark.api.java.function.PairFunction;
14import org.apache.spark.storage.StorageLevel;
15import org.apache.spark.util.DoubleAccumulator;
16import scala.Tuple2;
17
18import org.apache.spark.SparkConf;
19
20public class ProcessForCatalogLangCount implements Serializable
21{
22 private static final long serialVersionUID = 1L;
23
24 protected static final int DEFAULT_FILES_PER_PARTITION = 3000;
25
26 protected String _input_dir;
27 protected String _json_list_filename;
28
29 protected int _verbosity;
30
31 public ProcessForCatalogLangCount(String input_dir, String json_list_filename, int verbosity)
32 {
33 _input_dir = input_dir;
34 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
35
36 _verbosity = verbosity;
37 }
38
39 protected String generateSparkAppName(String exec_mode)
40 {
41 String spark_app_name = "[" + exec_mode + "] Extracted Features: Process for Catalog Language Labels";
42 spark_app_name += " [" + _json_list_filename + "]";
43
44 return spark_app_name;
45 }
46
47 public void execCatalogLangCountSparkDirect()
48 {
49 SparkConf conf = new SparkConf().setAppName("Spark-Direct + Per Volume");
50 JavaSparkContext jsc = new JavaSparkContext(conf);
51
52 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
53 String output_directory = "catalog-lang-" + filename_root + "-out";
54 if (ClusterFileIO.exists(output_directory))
55 {
56 System.err.println("Error: " + output_directory + " already exists. Spark unable to write output data");
57 jsc.close();
58 System.exit(1);
59 }
60
61 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
62 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.files-per-partition", DEFAULT_FILES_PER_PARTITION);
63
64 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
65 json_list_data.setName("JSON-file-list");
66
67 long num_volumes = json_list_data.count();
68 double per_vol = 100.0/(double)num_volumes;
69
70 int num_partitions = (int)(num_volumes/files_per_partition)+1;
71
72 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
73 json_list_data_rp.setName("JSON-file-list--repartitioned");
74
75 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
76
77 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
78
79 PerVolumeCatalogLangStreamFlatmap volume_catalog_langfreq_flatmap
80 = new PerVolumeCatalogLangStreamFlatmap(_input_dir,_verbosity,
81 per_vol_progress_accum,per_vol,
82 strict_file_io);
83 JavaRDD<String> catalog_lang_list = json_list_data_rp.flatMap(volume_catalog_langfreq_flatmap);
84 catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
85 catalog_lang_list.setName("catalog-lang-stream");
86
87
88 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
89 catalog_lang_pairs.setName("single-catalog-lang-count");
90
91 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
92 catalog_lang_counts.setName("catalog-lang-frequency");
93
94 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
95 = catalog_lang_counts.mapToPair(item -> item.swap());
96 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
97
98 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
99 = catalog_lang_counts_swapped_pair.sortByKey(false, num_partitions);
100 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
101
102 JavaPairRDD<String, Long> catalog_lang_count_sorted
103 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
104 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
105
106
107 catalog_lang_count_sorted.saveAsTextFile(output_directory);
108 jsc.close();
109 }
110
111 public void sampleDown10000()
112 {
113 String spark_app_name = generateSparkAppName("Spark Cluster + Per Volume: Downsample 10000");
114
115 SparkConf conf = new SparkConf().setAppName(spark_app_name);
116 JavaSparkContext jsc = new JavaSparkContext(conf);
117 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
118
119 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
120
121 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
122
123 JavaPairRDD<Text, Text> json_text_sample_rdd = input_pair_rdd.sample(false,0.0001,42);
124
125 String output_directory = "packed-ef-10000";
126 json_text_sample_rdd.saveAsTextFile(output_directory);
127
128
129 jsc.close();
130 }
131
132 public void sampleDown100()
133 {
134 String spark_app_name = generateSparkAppName("Spark Cluster + Per Volume: Downsample 100");
135
136 SparkConf conf = new SparkConf().setAppName(spark_app_name);
137 JavaSparkContext jsc = new JavaSparkContext(conf);
138 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
139
140 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
141
142 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
143
144 JavaPairRDD<Text, Text> json_text_sample_rdd = input_pair_rdd.sample(false,0.01,42);
145
146 JavaPairRDD<Text, Text> json_text_sample_repart_rdd = json_text_sample_rdd.repartition(120);
147
148 String output_directory = "packed-full-ef-100";
149 //json_text_sample_repart_rdd.saveAsTextFile(output_directory);
150 //json_text_sample_repart_rdd.saveAsSequenceFile(output_directory);
151 json_text_sample_repart_rdd.saveAsHadoopFile(output_directory, Text.class, Text.class, SequenceFileOutputFormat.class);
152
153
154 jsc.close();
155 }
156 public void execCatalogLangCount()
157 {
158
159 String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume");
160
161 SparkConf conf = new SparkConf().setAppName(spark_app_name);
162 JavaSparkContext jsc = new JavaSparkContext(conf);
163 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
164
165 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
166
167 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
168 //JavaRDD<Text> jsonTextRdd = input_pair_rdd.map(Tuple2::_2);
169 JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2);
170
171 //JavaRDD<Text> json_text_sample_rdd = json_text_rdd.sample(false,0.0001);
172
173 /*
174 jsonTextRdd.map(
175 jsonText -> "" // parse JSON text and do whatever ...
176 );
177 */
178
179 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
180
181 PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map
182 = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io);
183 JavaRDD<String> catalog_lang_list = json_text_rdd.map(volume_catalog_langfreq_map);
184 //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
185 catalog_lang_list.setName("catalog-lang-stream");
186
187
188 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
189 catalog_lang_pairs.setName("single-catalog-lang-count");
190
191 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
192 catalog_lang_counts.setName("catalog-lang-frequency");
193
194 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
195 = catalog_lang_counts.mapToPair(item -> item.swap());
196 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
197
198 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
199 = catalog_lang_counts_swapped_pair.sortByKey(false,20);
200 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
201
202 JavaPairRDD<String, Long> catalog_lang_count_sorted
203 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
204 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
205
206 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
207 String output_directory = "catalog-lang-" + filename_root + "-out";
208 catalog_lang_count_sorted.saveAsTextFile(output_directory);
209 jsc.close();
210 }
211
212
213
214 public static void print_usage(HelpFormatter formatter, Options options)
215 {
216 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
217 }
218
219 public static void main(String[] args) {
220 Options options = new Options();
221
222 Option verbosity_opt = new Option("v", "verbosity", true,
223 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
224 verbosity_opt.setRequired(false);
225 options.addOption(verbosity_opt);
226
227 Option properties_opt = new Option("p", "properties", true,
228 "Read in the specified Java properties file");
229 properties_opt.setRequired(false);
230 options.addOption(properties_opt);
231
232 // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
233 CommandLineParser parser = new GnuParser();
234 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
235
236 HelpFormatter formatter = new HelpFormatter();
237 CommandLine cmd = null;
238
239 try {
240 cmd = parser.parse(options, args);
241 }
242 catch (ParseException e) {
243 System.err.println(e.getMessage());
244 print_usage(formatter,options);
245 System.exit(1);
246 }
247
248 String verbosity_str = cmd.getOptionValue("verbosity","1");
249 int verbosity = Integer.parseInt(verbosity_str);
250
251 String property_filename = cmd.getOptionValue("properties",null);
252
253 String[] filtered_args = cmd.getArgs();
254
255 if (filtered_args.length != 2) {
256 print_usage(formatter,options);
257 System.exit(1);
258 }
259
260 if (property_filename != null) {
261 try {
262 FileInputStream fis = new FileInputStream(property_filename);
263 BufferedInputStream bis = new BufferedInputStream(fis);
264
265 System.getProperties().load(bis);
266 }
267 catch (FileNotFoundException e) {
268 e.printStackTrace();
269 System.err.println("File not found: '" + property_filename + "'. Skipping property file read");
270 }
271 catch (IOException e) {
272 System.err.println("IO Exception for: '" + property_filename + "'. Malformed syntax? Skipping property file read");
273 }
274 }
275
276 String input_dir = filtered_args[0];
277 String json_list_filename = filtered_args[1];
278
279 ProcessForCatalogLangCount prep_for_lang
280 = new ProcessForCatalogLangCount(input_dir,json_list_filename,verbosity);
281
282 //prep_for_lang.execCatalogLangCount();
283 prep_for_lang.sampleDown100();
284
285 }
286}
Note: See TracBrowser for help on using the repository browser.