source: other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java@ 31364

Last change on this file since 31364 was 31364, checked in by davidb, 7 years ago

removed sample() line

  • Property svn:executable set to *
File size: 9.1 KB
Line 
1package org.hathitrust.extractedfeatures;
2
3import java.io.BufferedInputStream;
4import java.io.FileInputStream;
5import java.io.FileNotFoundException;
6import java.io.IOException;
7import java.io.Serializable;
8import org.apache.commons.cli.*;
9import org.apache.hadoop.io.Text;
10import org.apache.spark.api.java.*;
11import org.apache.spark.api.java.function.Function2;
12import org.apache.spark.api.java.function.PairFunction;
13import org.apache.spark.storage.StorageLevel;
14import org.apache.spark.util.DoubleAccumulator;
15import scala.Tuple2;
16
17import org.apache.spark.SparkConf;
18
19public class ProcessForCatalogLangCount implements Serializable
20{
21 private static final long serialVersionUID = 1L;
22
23 protected static final int DEFAULT_FILES_PER_PARTITION = 3000;
24
25 protected String _input_dir;
26 protected String _json_list_filename;
27
28 protected int _verbosity;
29
30 public ProcessForCatalogLangCount(String input_dir, String json_list_filename, int verbosity)
31 {
32 _input_dir = input_dir;
33 _json_list_filename = (json_list_filename != null) ? json_list_filename : input_dir;
34
35 _verbosity = verbosity;
36 }
37
38 protected String generateSparkAppName(String exec_mode)
39 {
40 String spark_app_name = "[" + exec_mode + "] Extracted Features: Process for Catalog Language Labels";
41 spark_app_name += " [" + _json_list_filename + "]";
42
43 return spark_app_name;
44 }
45
46 public void execCatalogLangCountSparkDirect()
47 {
48 String spark_app_name = generateSparkAppName("Spark-Direct + Per Volume");
49
50 SparkConf conf = new SparkConf().setAppName(spark_app_name);
51 JavaSparkContext jsc = new JavaSparkContext(conf);
52
53 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
54 String output_directory = "catalog-lang-" + filename_root + "-out";
55 if (ClusterFileIO.exists(output_directory))
56 {
57 System.err.println("Error: " + output_directory + " already exists. Spark unable to write output data");
58 jsc.close();
59 System.exit(1);
60 }
61
62 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
63 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.files-per-partition", DEFAULT_FILES_PER_PARTITION);
64
65 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache();
66 json_list_data.setName("JSON-file-list");
67
68 long num_volumes = json_list_data.count();
69 double per_vol = 100.0/(double)num_volumes;
70
71 int num_partitions = (int)(num_volumes/files_per_partition)+1;
72
73 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions);
74 json_list_data_rp.setName("JSON-file-list--repartitioned");
75
76 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
77
78 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
79
80 PerVolumeCatalogLangStreamFlatmap volume_catalog_langfreq_flatmap
81 = new PerVolumeCatalogLangStreamFlatmap(_input_dir,_verbosity,
82 per_vol_progress_accum,per_vol,
83 strict_file_io);
84 JavaRDD<String> catalog_lang_list = json_list_data_rp.flatMap(volume_catalog_langfreq_flatmap);
85 catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
86 catalog_lang_list.setName("catalog-lang-stream");
87
88
89 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
90 catalog_lang_pairs.setName("single-catalog-lang-count");
91
92 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
93 catalog_lang_counts.setName("catalog-lang-frequency");
94
95 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
96 = catalog_lang_counts.mapToPair(item -> item.swap());
97 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
98
99 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
100 = catalog_lang_counts_swapped_pair.sortByKey(false, num_partitions);
101 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
102
103 JavaPairRDD<String, Long> catalog_lang_count_sorted
104 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
105 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
106
107
108 catalog_lang_count_sorted.saveAsTextFile(output_directory);
109 jsc.close();
110 }
111
112 public void execCatalogLangCount()
113 {
114
115
116 String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume");
117
118 SparkConf conf = new SparkConf().setAppName(spark_app_name);
119 JavaSparkContext jsc = new JavaSparkContext(conf);
120 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
121
122 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
123
124 JavaPairRDD<Text, Text> input_pair_rdd = jsc.sequenceFile(packed_sequence_path, Text.class, Text.class);
125 //JavaRDD<Text> jsonTextRdd = input_pair_rdd.map(Tuple2::_2);
126 JavaRDD<Text> json_text_rdd = input_pair_rdd.map(item -> item._2);
127
128 //JavaRDD<Text> json_text_sample_rdd = json_text_rdd.sample(false,0.0001);
129
130 /*
131 jsonTextRdd.map(
132 jsonText -> "" // parse JSON text and do whatever ...
133 );
134 */
135
136 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
137
138 PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map
139 = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io);
140 JavaRDD<String> catalog_lang_list = json_text_rdd.map(volume_catalog_langfreq_map);
141 //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
142 catalog_lang_list.setName("catalog-lang-stream");
143
144
145 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
146 catalog_lang_pairs.setName("single-catalog-lang-count");
147
148 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
149 catalog_lang_counts.setName("catalog-lang-frequency");
150
151 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
152 = catalog_lang_counts.mapToPair(item -> item.swap());
153 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
154
155 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
156 = catalog_lang_counts_swapped_pair.sortByKey(false,20);
157 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
158
159 JavaPairRDD<String, Long> catalog_lang_count_sorted
160 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
161 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
162
163 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
164 String output_directory = "catalog-lang-" + filename_root + "-out";
165 catalog_lang_count_sorted.saveAsTextFile(output_directory);
166 jsc.close();
167 }
168
169
170
171 public static void print_usage(HelpFormatter formatter, Options options)
172 {
173 formatter.printHelp("RUN.bash [options] input-dir json-filelist.txt", options);
174 }
175
176 public static void main(String[] args) {
177 Options options = new Options();
178
179 Option verbosity_opt = new Option("v", "verbosity", true,
180 "Set to control the level of debugging output [0=none, 1=some, 2=lots]");
181 verbosity_opt.setRequired(false);
182 options.addOption(verbosity_opt);
183
184 Option properties_opt = new Option("p", "properties", true,
185 "Read in the specified Java properties file");
186 properties_opt.setRequired(false);
187 options.addOption(properties_opt);
188
189 // Need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark
190 CommandLineParser parser = new GnuParser();
191 //CommandLineParser parser = new DefaultParser(); // if working with CLI v1.3 and above
192
193 HelpFormatter formatter = new HelpFormatter();
194 CommandLine cmd = null;
195
196 try {
197 cmd = parser.parse(options, args);
198 }
199 catch (ParseException e) {
200 System.err.println(e.getMessage());
201 print_usage(formatter,options);
202 System.exit(1);
203 }
204
205 String verbosity_str = cmd.getOptionValue("verbosity","1");
206 int verbosity = Integer.parseInt(verbosity_str);
207
208 String property_filename = cmd.getOptionValue("properties",null);
209
210 String[] filtered_args = cmd.getArgs();
211
212 if (filtered_args.length != 2) {
213 print_usage(formatter,options);
214 System.exit(1);
215 }
216
217 if (property_filename != null) {
218 try {
219 FileInputStream fis = new FileInputStream(property_filename);
220 BufferedInputStream bis = new BufferedInputStream(fis);
221
222 System.getProperties().load(bis);
223 }
224 catch (FileNotFoundException e) {
225 e.printStackTrace();
226 System.err.println("File not found: '" + property_filename + "'. Skipping property file read");
227 }
228 catch (IOException e) {
229 System.err.println("IO Exception for: '" + property_filename + "'. Malformed syntax? Skipping property file read");
230 }
231 }
232
233 String input_dir = filtered_args[0];
234 String json_list_filename = filtered_args[1];
235
236 ProcessForCatalogLangCount prep_for_lang
237 = new ProcessForCatalogLangCount(input_dir,json_list_filename,verbosity);
238
239 prep_for_lang.execCatalogLangCount();
240
241 }
242}
Note: See TracBrowser for help on using the repository browser.