Changeset 31359 for other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java
- Timestamp:
- 2017-01-26T23:08:16+13:00 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java
r31294 r31359 44 44 } 45 45 46 public void execCatalogLangCount ()46 public void execCatalogLangCountSparkDirect() 47 47 { 48 String spark_app_name = generateSparkAppName(" Per Volume");48 String spark_app_name = generateSparkAppName("Spark-Direct + Per Volume"); 49 49 50 50 SparkConf conf = new SparkConf().setAppName(spark_app_name); 51 51 JavaSparkContext jsc = new JavaSparkContext(conf); 52 52 53 53 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$",""); 54 54 String output_directory = "catalog-lang-" + filename_root + "-out"; … … 110 110 } 111 111 112 public void execCatalogLangCount() 113 { 114 115 116 String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume"); 117 118 SparkConf conf = new SparkConf().setAppName(spark_app_name); 119 JavaSparkContext jsc = new JavaSparkContext(conf); 120 jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin"); 121 122 String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef"; 123 124 JavaPairRDD<String, String> inputRdd = jsc.sequenceFile(packed_sequence_path, String.class, String.class); 125 JavaRDD<String> jsonTextRdd = inputRdd.map(Tuple2::_2); 126 127 /* 128 jsonTextRdd.map( 129 jsonText -> "" // parse JSON text and do whatever ... 130 ); 131 */ 132 133 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 134 135 PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map 136 = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io); 137 JavaRDD<String> catalog_lang_list = jsonTextRdd.map(volume_catalog_langfreq_map); 138 //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK()); 139 catalog_lang_list.setName("catalog-lang-stream"); 140 141 142 JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L)); 143 catalog_lang_pairs.setName("single-catalog-lang-count"); 144 145 JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b); 146 catalog_lang_counts.setName("catalog-lang-frequency"); 147 148 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair 149 = catalog_lang_counts.mapToPair(item -> item.swap()); 150 catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap"); 151 152 JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted 153 = catalog_lang_counts_swapped_pair.sortByKey(false); 154 catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang"); 155 156 JavaPairRDD<String, Long> catalog_lang_count_sorted 157 = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap()); 158 catalog_lang_count_sorted.setName("descending-catalog-lang-frequency"); 159 160 String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$",""); 161 String output_directory = "catalog-lang-" + filename_root + "-out"; 162 catalog_lang_count_sorted.saveAsTextFile(output_directory); 163 jsc.close(); 164 } 165 166 112 167 113 168 public static void print_usage(HelpFormatter formatter, Options options)
Note:
See TracChangeset
for help on using the changeset viewer.