Changeset 31359


Ignore:
Timestamp:
2017-01-26T23:08:16+13:00 (6 years ago)
Author:
davidb
Message:

Changed over to use sequenceFiles as input

Location:
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
Files:
1 added
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForCatalogLangCount.java

    r31294 r31359  
    4444    }
    4545       
    46     public void execCatalogLangCount()
     46    public void execCatalogLangCountSparkDirect()
    4747    {   
    48         String spark_app_name = generateSparkAppName("Per Volume");     
     48        String spark_app_name = generateSparkAppName("Spark-Direct + Per Volume");     
    4949       
    5050        SparkConf conf = new SparkConf().setAppName(spark_app_name);
    5151        JavaSparkContext jsc = new JavaSparkContext(conf);
    52 
     52       
    5353        String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
    5454        String output_directory = "catalog-lang-" + filename_root + "-out";
     
    110110    }
    111111
     112    public void execCatalogLangCount()
     113    {   
     114       
     115       
     116        String spark_app_name = generateSparkAppName("YARN Cluster + Per Volume");     
     117       
     118        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     119        JavaSparkContext jsc = new JavaSparkContext(conf);
     120        jsc.hadoopConfiguration().set("io.compression.codec.bzip2.library", "java-builtin");
     121       
     122        String packed_sequence_path = "hdfs:///user/capitanu/data/packed-ef";
     123
     124        JavaPairRDD<String, String> inputRdd = jsc.sequenceFile(packed_sequence_path, String.class, String.class);
     125        JavaRDD<String> jsonTextRdd = inputRdd.map(Tuple2::_2);
     126
     127        /*
     128        jsonTextRdd.map(
     129            jsonText -> "" // parse JSON text and do whatever ...
     130        );
     131            */
     132       
     133        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
     134       
     135        PerVolumeCatalogLangSequenceFileMap volume_catalog_langfreq_map
     136            = new PerVolumeCatalogLangSequenceFileMap(_input_dir,_verbosity,strict_file_io);
     137        JavaRDD<String> catalog_lang_list = jsonTextRdd.map(volume_catalog_langfreq_map);
     138        //catalog_lang_list.persist(StorageLevel.MEMORY_AND_DISK());
     139        catalog_lang_list.setName("catalog-lang-stream");
     140   
     141       
     142        JavaPairRDD<String, Long> catalog_lang_pairs = catalog_lang_list.mapToPair(s -> new Tuple2<String, Long>(s, 1L));
     143        catalog_lang_pairs.setName("single-catalog-lang-count");
     144       
     145        JavaPairRDD<String, Long> catalog_lang_counts = catalog_lang_pairs.reduceByKey((a, b) -> a + b);
     146        catalog_lang_counts.setName("catalog-lang-frequency");
     147       
     148        JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair
     149            = catalog_lang_counts.mapToPair(item -> item.swap());
     150        catalog_lang_counts_swapped_pair.setName("frequency-catalog-lang-swap");
     151       
     152        JavaPairRDD<Long, String> catalog_lang_counts_swapped_pair_sorted
     153            = catalog_lang_counts_swapped_pair.sortByKey(false);
     154        catalog_lang_counts_swapped_pair_sorted.setName("descending-sorted-frequency-catalog-lang");
     155       
     156        JavaPairRDD<String, Long> catalog_lang_count_sorted
     157            = catalog_lang_counts_swapped_pair_sorted.mapToPair(item -> item.swap());
     158        catalog_lang_count_sorted.setName("descending-catalog-lang-frequency");
     159       
     160        String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
     161        String output_directory = "catalog-lang-" + filename_root + "-out";
     162        catalog_lang_count_sorted.saveAsTextFile(output_directory);
     163        jsc.close();
     164    }
     165   
     166   
    112167   
    113168    public static void print_usage(HelpFormatter formatter, Options options)
Note: See TracChangeset for help on using the changeset viewer.