source: other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java@ 31365

Last change on this file since 31365 was 31365, checked in by davidb, 7 years ago

Quick code added to downsample

  • Property svn:executable set to *
File size: 9.7 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.BufferedInputStream;
4import java.io.FileInputStream;
5import java.io.FileNotFoundException;
6import java.io.IOException;
7import java.io.Serializable;
8import org.apache.commons.cli.*;
9import org.apache.hadoop.io.Text;
10import org.apache.spark.api.java.*;
11import org.apache.spark.api.java.function.Function2;
12import org.apache.spark.api.java.function.PairFunction;
13import org.apache.spark.storage.StorageLevel;
14import org.apache.spark.util.DoubleAccumulator;
15import scala.Tuple2;
16
17import org.apache.spark.SparkConf;
18
19public class ProcessForCatalogLangCount implements Serializable
20{
21 private static final long serialVersionUID = 1L;
22
23 protected static final int DEFAULT_FILES_PER_PARTITION = 3000;
24
25 protected String _input_dir;
26 protected String _json_list_filename;
27
28 protected int _verbosity;
29
30 public ProcessForCatalogLangCount(String input_dir, String json_list_filename, int verbosity)
31 {
32 _input_dir = input_dir;
33 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
34
35 _verbosity = verbosity;
36 }
37
38 protected String generateSparkAppName(String exec_mode)
39 {
40 String spark_app_name = "[" + exec_mode + "] Extracted Features: Process for Catalog Language Labels";
41 spark_app_name += " [" + _json_list_filename + "]";
42
43 return spark_app_name;
44 }
45
46 public void execCatalogLangCountSparkDirect()
47 {
48 SparkConf conf = new SparkConf().setAppName("Spark-Direct + Per Volume: Downsample");
49 JavaSparkContext jsc = new JavaSparkContext(conf);
50
51 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
52 String output_directory = "catalog-lang-" + filename_root + "-out";
53 if (ClusterFileIO.exists(output_directory))
54 {
55 System.err.println("Error: " + output_directory + " already exists. Spark unable to write output data");
56 jsc.close();
57 System.exit(1);
58 }
59
60 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
61 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.files-per-partition", DEFAULT_FILES_PER_PARTITION);
62
63 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
64 json_list_data.setName("JSON-file-list");
65
66 long num_volumes = json_list_data.count();
67 double per_vol = 100.0/(double)num_volumes;
68
69 int num_partitions = (int)(num_volumes/files_per_partition)+1;
70
71 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
72 json_list_data_rp.setName("JSON-file-list--repartitioned");
73
74 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
75
76 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
77
78 PerVolumeCatalogLangStreamFlatmap volume_catalog_langfreq_flatmap
79 = new PerVolumeCatalogLangStreamFlatmap(_input_dir,_verbosity,
80 per_vol_progress_accum,per_vol,
81 strict_file_io);
82 JavaRDD<String> catalog_lang_list = json_list_data_rp.flatMap(volume_catalog_langfreq_flatmap);
83 catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
84 catalog_lang_list.setName("catalog-lang-stream");
85
86
87 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
88 catalog_lang_pairs.setName("single-catalog-lang-count");
89
90 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
91 catalog_lang_counts.setName("catalog-lang-frequency");
92
93 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
94 = catalog_lang_counts.mapToPair(item -> item.swap());
95 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
96
97 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
98 = catalog_lang_counts_swapped_pair.sortByKey(false, num_partitions);
99 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
100
101 JavaPairRDD<String, Long> catalog_lang_count_sorted
102 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
103 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
104
105
106 catalog_lang_count_sorted.saveAsTextFile(output_directory);
107 jsc.close();
108 }
109
110 public void sampleDown()
111 {
112 String spark_app_name = generateSparkAppName("Spark Cluster + Per Volume");
113
114 SparkConf conf = new SparkConf().setAppName(spark_app_name);
115 JavaSparkContext jsc = new JavaSparkContext(conf);
116 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
117
118 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
119
120 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
121
122 JavaPairRDD<Text, Text> json_text_sample_rdd = input_pair_rdd.sample(false,0.0001,42);
123
124 String output_directory = "packed-ef-10000";
125 json_text_sample_rdd.saveAsTextFile(output_directory);
126
127
128
129 }
130 public void execCatalogLangCount()
131 {
132
133 String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume");
134
135 SparkConf conf = new SparkConf().setAppName(spark_app_name);
136 JavaSparkContext jsc = new JavaSparkContext(conf);
137 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
138
139 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
140
141 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
142 //JavaRDD<Text> jsonTextRdd = input_pair_rdd.map(Tuple2::_2);
143 JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2);
144
145 //JavaRDD<Text> json_text_sample_rdd = json_text_rdd.sample(false,0.0001);
146
147 /*
148 jsonTextRdd.map(
149 jsonText -> "" // parse JSON text and do whatever ...
150 );
151 */
152
153 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
154
155 PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map
156 = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io);
157 JavaRDD<String> catalog_lang_list = json_text_rdd.map(volume_catalog_langfreq_map);
158 //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
159 catalog_lang_list.setName("catalog-lang-stream");
160
161
162 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
163 catalog_lang_pairs.setName("single-catalog-lang-count");
164
165 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
166 catalog_lang_counts.setName("catalog-lang-frequency");
167
168 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
169 = catalog_lang_counts.mapToPair(item -> item.swap());
170 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
171
172 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
173 = catalog_lang_counts_swapped_pair.sortByKey(false,20);
174 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
175
176 JavaPairRDD<String, Long> catalog_lang_count_sorted
177 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
178 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
179
180 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
181 String output_directory = "catalog-lang-" + filename_root + "-out";
182 catalog_lang_count_sorted.saveAsTextFile(output_directory);
183 jsc.close();
184 }
185
186
187
188 public static void print_usage(HelpFormatter formatter, Options options)
189 {
190 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
191 }
192
193 public static void main(String[] args) {
194 Options options = new Options();
195
196 Option verbosity_opt = new Option("v", "verbosity", true,
197 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
198 verbosity_opt.setRequired(false);
199 options.addOption(verbosity_opt);
200
201 Option properties_opt = new Option("p", "properties", true,
202 "Read in the specified Java properties file");
203 properties_opt.setRequired(false);
204 options.addOption(properties_opt);
205
206 // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
207 CommandLineParser parser = new GnuParser();
208 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
209
210 HelpFormatter formatter = new HelpFormatter();
211 CommandLine cmd = null;
212
213 try {
214 cmd = parser.parse(options, args);
215 }
216 catch (ParseException e) {
217 System.err.println(e.getMessage());
218 print_usage(formatter,options);
219 System.exit(1);
220 }
221
222 String verbosity_str = cmd.getOptionValue("verbosity","1");
223 int verbosity = Integer.parseInt(verbosity_str);
224
225 String property_filename = cmd.getOptionValue("properties",null);
226
227 String[] filtered_args = cmd.getArgs();
228
229 if (filtered_args.length != 2) {
230 print_usage(formatter,options);
231 System.exit(1);
232 }
233
234 if (property_filename != null) {
235 try {
236 FileInputStream fis = new FileInputStream(property_filename);
237 BufferedInputStream bis = new BufferedInputStream(fis);
238
239 System.getProperties().load(bis);
240 }
241 catch (FileNotFoundException e) {
242 e.printStackTrace();
243 System.err.println("File not found: '" + property_filename + "'. Skipping property file read");
244 }
245 catch (IOException e) {
246 System.err.println("IO Exception for: '" + property_filename + "'. Malformed syntax? Skipping property file read");
247 }
248 }
249
250 String input_dir = filtered_args[0];
251 String json_list_filename = filtered_args[1];
252
253 ProcessForCatalogLangCount prep_for_lang
254 = new ProcessForCatalogLangCount(input_dir,json_list_filename,verbosity);
255
256 //prep_for_lang.execCatalogLangCount();
257 prep_for_lang.sampleDown();
258
259 }
260}
Note: See TracBrowser for help on using the repository browser.