Show
Ignore:
Timestamp:
26.01.2017 23:08:16 (3 years ago)
Author:
davidb
Message:

Changed over to use sequenceFiles as input

Location:
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
Files:
1 added
1 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java

    r31294 r31359  
    4444    } 
    4545         
    46     public void execCatalogLangCount() 
     46    public void execCatalogLangCountSparkDirect() 
    4747    {    
    48         String spark_app_name = generateSparkAppName("Per Volume");      
     48        String spark_app_name = generateSparkAppName("Spark-Direct + Per Volume");       
    4949         
    5050        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
    5151        JavaSparkContext jsc = new JavaSparkContext(conf); 
    52  
     52         
    5353        String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$",""); 
    5454        String output_directory = "catalog-lang-" + filename_root + "-out"; 
     
    110110    } 
    111111 
     112    public void execCatalogLangCount() 
     113    {    
     114         
     115         
     116        String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume");       
     117         
     118        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
     119        JavaSparkContext jsc = new JavaSparkContext(conf); 
     120        jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin"); 
     121         
     122        String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef"; 
     123 
     124        JavaPairRDD<String, String> inputRdd = jsc.sequenceFile(packed_sequence_path, String.class, String.class); 
     125        JavaRDD<String> jsonTextRdd = inputRdd.map(Tuple2::_2); 
     126 
     127        /* 
     128        jsonTextRdd.map( 
     129            jsonText -> "" // parse JSON text and do whatever ... 
     130        ); 
     131            */ 
     132         
     133        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 
     134         
     135        PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map 
     136            = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io); 
     137        JavaRDD<String> catalog_lang_list = jsonTextRdd.map(volume_catalog_langfreq_map);  
     138        //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK()); 
     139        catalog_lang_list.setName("catalog-lang-stream"); 
     140     
     141         
     142        JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L)); 
     143        catalog_lang_pairs.setName("single-catalog-lang-count"); 
     144         
     145        JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b); 
     146        catalog_lang_counts.setName("catalog-lang-frequency"); 
     147         
     148        JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair 
     149            = catalog_lang_counts.mapToPair(item -> item.swap()); 
     150        catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap"); 
     151         
     152        JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted  
     153            = catalog_lang_counts_swapped_pair.sortByKey(false); 
     154        catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang"); 
     155         
     156        JavaPairRDD<String, Long> catalog_lang_count_sorted  
     157            = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap()); 
     158        catalog_lang_count_sorted.setName("descending-catalog-lang-frequency"); 
     159         
     160        String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$",""); 
     161        String output_directory = "catalog-lang-" + filename_root + "-out"; 
     162        catalog_lang_count_sorted.saveAsTextFile(output_directory); 
     163        jsc.close(); 
     164    } 
     165     
     166     
    112167     
    113168    public static void print_usage(HelpFormatter formatter, Options options)