source: other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java@ 31371

Last change on this file since 31371 was 31371, checked in by davidb, 7 years ago

Trying to get saveAsSequenceFile working

  • Property svn:executable set to *
File size: 12.2 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.BufferedInputStream;
4import java.io.FileInputStream;
5import java.io.FileNotFoundException;
6import java.io.IOException;
7import java.io.Serializable;
8import org.apache.commons.cli.*;
9import org.apache.hadoop.io.Text;
10import org.apache.hadoop.io.compress.BZip2Codec;
11import org.apache.hadoop.mapreduce.OutputFormat;
12import org.apache.hadoop.mapred.SequenceFileOutputFormat;
13//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
14import org.apache.spark.api.java.*;
15import org.apache.spark.api.java.function.Function2;
16import org.apache.spark.api.java.function.PairFunction;
17import org.apache.spark.storage.StorageLevel;
18import org.apache.spark.util.DoubleAccumulator;
19import scala.Tuple2;
20
21import org.apache.spark.SparkConf;
22
23public class ProcessForCatalogLangCount implements Serializable
24{
25 private static final long serialVersionUID = 1L;
26
27 protected static final int DEFAULT_FILES_PER_PARTITION = 3000;
28
29 protected String _input_dir;
30 protected String _json_list_filename;
31
32 protected int _verbosity;
33
34 public ProcessForCatalogLangCount(String input_dir, String json_list_filename, int verbosity)
35 {
36 _input_dir = input_dir;
37 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
38
39 _verbosity = verbosity;
40 }
41
42 protected String generateSparkAppName(String exec_mode)
43 {
44 String spark_app_name = "[" + exec_mode + "] Extracted Features: Process for Catalog Language Labels";
45 spark_app_name += " [" + _json_list_filename + "]";
46
47 return spark_app_name;
48 }
49
50 public void execCatalogLangCountSparkDirect()
51 {
52 SparkConf conf = new SparkConf().setAppName("Spark-Direct + Per Volume");
53 JavaSparkContext jsc = new JavaSparkContext(conf);
54
55 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
56 String output_directory = "catalog-lang-" + filename_root + "-out";
57 if (ClusterFileIO.exists(output_directory))
58 {
59 System.err.println("Error: " + output_directory + " already exists. Spark unable to write output data");
60 jsc.close();
61 System.exit(1);
62 }
63
64 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
65 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.files-per-partition", DEFAULT_FILES_PER_PARTITION);
66
67 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
68 json_list_data.setName("JSON-file-list");
69
70 long num_volumes = json_list_data.count();
71 double per_vol = 100.0/(double)num_volumes;
72
73 int num_partitions = (int)(num_volumes/files_per_partition)+1;
74
75 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
76 json_list_data_rp.setName("JSON-file-list--repartitioned");
77
78 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
79
80 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
81
82 PerVolumeCatalogLangStreamFlatmap volume_catalog_langfreq_flatmap
83 = new PerVolumeCatalogLangStreamFlatmap(_input_dir,_verbosity,
84 per_vol_progress_accum,per_vol,
85 strict_file_io);
86 JavaRDD<String> catalog_lang_list = json_list_data_rp.flatMap(volume_catalog_langfreq_flatmap);
87 catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
88 catalog_lang_list.setName("catalog-lang-stream");
89
90
91 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
92 catalog_lang_pairs.setName("single-catalog-lang-count");
93
94 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
95 catalog_lang_counts.setName("catalog-lang-frequency");
96
97 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
98 = catalog_lang_counts.mapToPair(item -> item.swap());
99 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
100
101 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
102 = catalog_lang_counts_swapped_pair.sortByKey(false, num_partitions);
103 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
104
105 JavaPairRDD<String, Long> catalog_lang_count_sorted
106 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
107 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
108
109
110 catalog_lang_count_sorted.saveAsTextFile(output_directory);
111 jsc.close();
112 }
113
114 public void sampleDown10000()
115 {
116 String spark_app_name = generateSparkAppName("Spark Cluster + Per Volume: Downsample 10000");
117
118 SparkConf conf = new SparkConf().setAppName(spark_app_name);
119 JavaSparkContext jsc = new JavaSparkContext(conf);
120 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
121
122 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
123
124 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
125
126 JavaPairRDD<Text, Text> json_text_sample_rdd = input_pair_rdd.sample(false,0.0001,42);
127
128 String output_directory = "packed-ef-10000";
129 json_text_sample_rdd.saveAsTextFile(output_directory);
130
131
132 jsc.close();
133 }
134
135 public static class ConvertToWritableTypes implements PairFunction<Tuple2<Text, Text>, String, String> {
136 /**
137 *
138 */
139 private static final long serialVersionUID = 1L;
140
141 public Tuple2<String, String> call(Tuple2<Text, Text> record) {
142 return new Tuple2(record._1.toString(), record._2.toString());
143 }
144 }
145
146 public void sampleDown100()
147 {
148 String spark_app_name = generateSparkAppName("Spark Cluster + Per Volume: Downsample 100");
149
150 SparkConf conf = new SparkConf().setAppName(spark_app_name);
151 JavaSparkContext jsc = new JavaSparkContext(conf);
152 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
153
154 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
155
156 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
157
158 JavaPairRDD<Text, Text> json_text_sample_rdd = input_pair_rdd.sample(false,0.01,42);
159
160 JavaPairRDD<Text, Text> json_text_sample_repart_rdd = json_text_sample_rdd.repartition(120);
161
162 //JavaPairRDD<Text, Text> json_text_sample_repart_rdd = json_text_sample_rdd.repartition(120);
163
164 String output_directory = "packed-full-ef-100";
165 //json_text_sample_repart_rdd.saveAsTextFile(output_directory);
166 //json_text_sample_repart_rdd.saveAsSequenceFile(output_directory);
167 json_text_sample_repart_rdd.saveAsHadoopFile(output_directory, Text.class, Text.class, SequenceFileOutputFormat.class);
168
169 //SequenceFileOutputFormat<Text,Text> sfof = new SequenceFileOutputFormat<Text,Text>();
170 // //sfof.setOutputCompressionClass(BZip2Codec.class);
171 // //sfof.setOutputCompressorClass(conf);
172 // json_text_sample_repart_rdd.saveAsNewAPIHadoopFile(output_directory, Text.class, Text.class, sfof);
173 //org.apache.hadoop.mapred.SequenceFileAsBinaryOutputFormat
174 //org.apache.hadoop.mapred.
175 //json_text_sample_repart_rdd.saveAsObjectFile(output_directory);
176
177 //JavaPairRDD<String,String> result = json_text_sample_repart_rdd.mapToPair(new ConvertToWritableTypes());
178 //result.saveAsHadoopFile(output_directory, String.class, String.class, SequenceFileOutputFormat.class);
179
180 jsc.close();
181 }
182 public void execCatalogLangCount()
183 {
184
185 String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume");
186
187 SparkConf conf = new SparkConf().setAppName(spark_app_name);
188 JavaSparkContext jsc = new JavaSparkContext(conf);
189 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
190
191 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
192
193 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
194 //JavaRDD<Text> jsonTextRdd = input_pair_rdd.map(Tuple2::_2);
195 JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2);
196
197 //JavaRDD<Text> json_text_sample_rdd = json_text_rdd.sample(false,0.0001);
198
199 /*
200 jsonTextRdd.map(
201 jsonText -> "" // parse JSON text and do whatever ...
202 );
203 */
204
205 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
206
207 PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map
208 = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io);
209 JavaRDD<String> catalog_lang_list = json_text_rdd.map(volume_catalog_langfreq_map);
210 //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
211 catalog_lang_list.setName("catalog-lang-stream");
212
213
214 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
215 catalog_lang_pairs.setName("single-catalog-lang-count");
216
217 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
218 catalog_lang_counts.setName("catalog-lang-frequency");
219
220 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
221 = catalog_lang_counts.mapToPair(item -> item.swap());
222 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
223
224 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
225 = catalog_lang_counts_swapped_pair.sortByKey(false,20);
226 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
227
228 JavaPairRDD<String, Long> catalog_lang_count_sorted
229 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
230 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
231
232 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
233 String output_directory = "catalog-lang-" + filename_root + "-out";
234 catalog_lang_count_sorted.saveAsTextFile(output_directory);
235 jsc.close();
236 }
237
238
239
240 public static void print_usage(HelpFormatter formatter, Options options)
241 {
242 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
243 }
244
245 public static void main(String[] args) {
246 Options options = new Options();
247
248 Option verbosity_opt = new Option("v", "verbosity", true,
249 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
250 verbosity_opt.setRequired(false);
251 options.addOption(verbosity_opt);
252
253 Option properties_opt = new Option("p", "properties", true,
254 "Read in the specified Java properties file");
255 properties_opt.setRequired(false);
256 options.addOption(properties_opt);
257
258 // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
259 CommandLineParser parser = new GnuParser();
260 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
261
262 HelpFormatter formatter = new HelpFormatter();
263 CommandLine cmd = null;
264
265 try {
266 cmd = parser.parse(options, args);
267 }
268 catch (ParseException e) {
269 System.err.println(e.getMessage());
270 print_usage(formatter,options);
271 System.exit(1);
272 }
273
274 String verbosity_str = cmd.getOptionValue("verbosity","1");
275 int verbosity = Integer.parseInt(verbosity_str);
276
277 String property_filename = cmd.getOptionValue("properties",null);
278
279 String[] filtered_args = cmd.getArgs();
280
281 if (filtered_args.length != 2) {
282 print_usage(formatter,options);
283 System.exit(1);
284 }
285
286 if (property_filename != null) {
287 try {
288 FileInputStream fis = new FileInputStream(property_filename);
289 BufferedInputStream bis = new BufferedInputStream(fis);
290
291 System.getProperties().load(bis);
292 }
293 catch (FileNotFoundException e) {
294 e.printStackTrace();
295 System.err.println("File not found: '" + property_filename + "'. Skipping property file read");
296 }
297 catch (IOException e) {
298 System.err.println("IO Exception for: '" + property_filename + "'. Malformed syntax? Skipping property file read");
299 }
300 }
301
302 String input_dir = filtered_args[0];
303 String json_list_filename = filtered_args[1];
304
305 ProcessForCatalogLangCount prep_for_lang
306 = new ProcessForCatalogLangCount(input_dir,json_list_filename,verbosity);
307
308 //prep_for_lang.execCatalogLangCount();
309 prep_for_lang.sampleDown100();
310
311 }
312}
Note: See TracBrowser for help on using the repository browser.