source: other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java@ 31369

Last change on this file since 31369 was 31369, checked in by davidb, 7 years ago

Trial new save

  • Property svn:executable set to *
File size: 11.5 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.BufferedInputStream;
4import java.io.FileInputStream;
5import java.io.FileNotFoundException;
6import java.io.IOException;
7import java.io.Serializable;
8import org.apache.commons.cli.*;
9import org.apache.hadoop.io.Text;
10import org.apache.hadoop.mapred.SequenceFileOutputFormat;
11import org.apache.spark.api.java.*;
12import org.apache.spark.api.java.function.Function2;
13import org.apache.spark.api.java.function.PairFunction;
14import org.apache.spark.storage.StorageLevel;
15import org.apache.spark.util.DoubleAccumulator;
16import scala.Tuple2;
17
18import org.apache.spark.SparkConf;
19
20public class ProcessForCatalogLangCount implements Serializable
21{
22 private static final long serialVersionUID = 1L;
23
24 protected static final int DEFAULT_FILES_PER_PARTITION = 3000;
25
26 protected String _input_dir;
27 protected String _json_list_filename;
28
29 protected int _verbosity;
30
31 public ProcessForCatalogLangCount(String input_dir, String json_list_filename, int verbosity)
32 {
33 _input_dir = input_dir;
34 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
35
36 _verbosity = verbosity;
37 }
38
39 protected String generateSparkAppName(String exec_mode)
40 {
41 String spark_app_name = "[" + exec_mode + "] Extracted Features: Process for Catalog Language Labels";
42 spark_app_name += " [" + _json_list_filename + "]";
43
44 return spark_app_name;
45 }
46
47 public void execCatalogLangCountSparkDirect()
48 {
49 SparkConf conf = new SparkConf().setAppName("Spark-Direct + Per Volume");
50 JavaSparkContext jsc = new JavaSparkContext(conf);
51
52 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
53 String output_directory = "catalog-lang-" + filename_root + "-out";
54 if (ClusterFileIO.exists(output_directory))
55 {
56 System.err.println("Error: " + output_directory + " already exists. Spark unable to write output data");
57 jsc.close();
58 System.exit(1);
59 }
60
61 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
62 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.files-per-partition", DEFAULT_FILES_PER_PARTITION);
63
64 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
65 json_list_data.setName("JSON-file-list");
66
67 long num_volumes = json_list_data.count();
68 double per_vol = 100.0/(double)num_volumes;
69
70 int num_partitions = (int)(num_volumes/files_per_partition)+1;
71
72 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
73 json_list_data_rp.setName("JSON-file-list--repartitioned");
74
75 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
76
77 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
78
79 PerVolumeCatalogLangStreamFlatmap volume_catalog_langfreq_flatmap
80 = new PerVolumeCatalogLangStreamFlatmap(_input_dir,_verbosity,
81 per_vol_progress_accum,per_vol,
82 strict_file_io);
83 JavaRDD<String> catalog_lang_list = json_list_data_rp.flatMap(volume_catalog_langfreq_flatmap);
84 catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
85 catalog_lang_list.setName("catalog-lang-stream");
86
87
88 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
89 catalog_lang_pairs.setName("single-catalog-lang-count");
90
91 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
92 catalog_lang_counts.setName("catalog-lang-frequency");
93
94 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
95 = catalog_lang_counts.mapToPair(item -> item.swap());
96 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
97
98 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
99 = catalog_lang_counts_swapped_pair.sortByKey(false, num_partitions);
100 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
101
102 JavaPairRDD<String, Long> catalog_lang_count_sorted
103 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
104 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
105
106
107 catalog_lang_count_sorted.saveAsTextFile(output_directory);
108 jsc.close();
109 }
110
111 public void sampleDown10000()
112 {
113 String spark_app_name = generateSparkAppName("Spark Cluster + Per Volume: Downsample 10000");
114
115 SparkConf conf = new SparkConf().setAppName(spark_app_name);
116 JavaSparkContext jsc = new JavaSparkContext(conf);
117 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
118
119 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
120
121 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
122
123 JavaPairRDD<Text, Text> json_text_sample_rdd = input_pair_rdd.sample(false,0.0001,42);
124
125 String output_directory = "packed-ef-10000";
126 json_text_sample_rdd.saveAsTextFile(output_directory);
127
128
129 jsc.close();
130 }
131
132 public static class ConvertToWritableTypes implements PairFunction<Tuple2<Text, Text>, String, String> {
133 /**
134 *
135 */
136 private static final long serialVersionUID = 1L;
137
138 public Tuple2<String, String> call(Tuple2<Text, Text> record) {
139 return new Tuple2(record._1.toString(), record._2.toString());
140 }
141 }
142
143 public void sampleDown100()
144 {
145 String spark_app_name = generateSparkAppName("Spark Cluster + Per Volume: Downsample 100");
146
147 SparkConf conf = new SparkConf().setAppName(spark_app_name);
148 JavaSparkContext jsc = new JavaSparkContext(conf);
149 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
150
151 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
152
153 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
154
155 JavaPairRDD<Text, Text> json_text_sample_rdd = input_pair_rdd.sample(false,0.01,42);
156
157 JavaPairRDD<Text, Text> json_text_sample_repart_rdd = json_text_sample_rdd.repartition(120);
158
159 //JavaPairRDD<Text, Text> json_text_sample_repart_rdd = json_text_sample_rdd.repartition(120);
160
161 String output_directory = "packed-full-ef-100";
162 //json_text_sample_repart_rdd.saveAsTextFile(output_directory);
163 //json_text_sample_repart_rdd.saveAsSequenceFile(output_directory);
164 //json_text_sample_repart_rdd.saveAsHadoopFile(output_directory, Text.class, Text.class, SequenceFileOutputFormat.class);
165
166
167 JavaPairRDD<String,String> result = json_text_sample_repart_rdd.mapToPair(new ConvertToWritableTypes());
168 result.saveAsHadoopFile(output_directory, String.class, String.class, SequenceFileOutputFormat.class);
169
170 jsc.close();
171 }
172 public void execCatalogLangCount()
173 {
174
175 String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume");
176
177 SparkConf conf = new SparkConf().setAppName(spark_app_name);
178 JavaSparkContext jsc = new JavaSparkContext(conf);
179 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
180
181 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
182
183 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
184 //JavaRDD<Text> jsonTextRdd = input_pair_rdd.map(Tuple2::_2);
185 JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2);
186
187 //JavaRDD<Text> json_text_sample_rdd = json_text_rdd.sample(false,0.0001);
188
189 /*
190 jsonTextRdd.map(
191 jsonText -> "" // parse JSON text and do whatever ...
192 );
193 */
194
195 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
196
197 PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map
198 = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io);
199 JavaRDD<String> catalog_lang_list = json_text_rdd.map(volume_catalog_langfreq_map);
200 //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
201 catalog_lang_list.setName("catalog-lang-stream");
202
203
204 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
205 catalog_lang_pairs.setName("single-catalog-lang-count");
206
207 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
208 catalog_lang_counts.setName("catalog-lang-frequency");
209
210 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
211 = catalog_lang_counts.mapToPair(item -> item.swap());
212 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
213
214 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
215 = catalog_lang_counts_swapped_pair.sortByKey(false,20);
216 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
217
218 JavaPairRDD<String, Long> catalog_lang_count_sorted
219 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
220 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
221
222 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
223 String output_directory = "catalog-lang-" + filename_root + "-out";
224 catalog_lang_count_sorted.saveAsTextFile(output_directory);
225 jsc.close();
226 }
227
228
229
230 public static void print_usage(HelpFormatter formatter, Options options)
231 {
232 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
233 }
234
235 public static void main(String[] args) {
236 Options options = new Options();
237
238 Option verbosity_opt = new Option("v", "verbosity", true,
239 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
240 verbosity_opt.setRequired(false);
241 options.addOption(verbosity_opt);
242
243 Option properties_opt = new Option("p", "properties", true,
244 "Read in the specified Java properties file");
245 properties_opt.setRequired(false);
246 options.addOption(properties_opt);
247
248 // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
249 CommandLineParser parser = new GnuParser();
250 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
251
252 HelpFormatter formatter = new HelpFormatter();
253 CommandLine cmd = null;
254
255 try {
256 cmd = parser.parse(options, args);
257 }
258 catch (ParseException e) {
259 System.err.println(e.getMessage());
260 print_usage(formatter,options);
261 System.exit(1);
262 }
263
264 String verbosity_str = cmd.getOptionValue("verbosity","1");
265 int verbosity = Integer.parseInt(verbosity_str);
266
267 String property_filename = cmd.getOptionValue("properties",null);
268
269 String[] filtered_args = cmd.getArgs();
270
271 if (filtered_args.length != 2) {
272 print_usage(formatter,options);
273 System.exit(1);
274 }
275
276 if (property_filename != null) {
277 try {
278 FileInputStream fis = new FileInputStream(property_filename);
279 BufferedInputStream bis = new BufferedInputStream(fis);
280
281 System.getProperties().load(bis);
282 }
283 catch (FileNotFoundException e) {
284 e.printStackTrace();
285 System.err.println("File not found: '" + property_filename + "'. Skipping property file read");
286 }
287 catch (IOException e) {
288 System.err.println("IO Exception for: '" + property_filename + "'. Malformed syntax? Skipping property file read");
289 }
290 }
291
292 String input_dir = filtered_args[0];
293 String json_list_filename = filtered_args[1];
294
295 ProcessForCatalogLangCount prep_for_lang
296 = new ProcessForCatalogLangCount(input_dir,json_list_filename,verbosity);
297
298 //prep_for_lang.execCatalogLangCount();
299 prep_for_lang.sampleDown100();
300
301 }
302}
Note: See TracBrowser for help on using the repository browser.