Changeset 31251 for other-projects


Ignore:
Timestamp:
2016-12-19T15:13:52+13:00 (7 years ago)
Author:
davidb
Message:

Code tidy up. Timed experiment showed sorting by key with num_partitions significantly faster on 1000 slice

File:
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForWhitelist.java

    r31250 r31251  
    6060        JavaSparkContext jsc = new JavaSparkContext(conf);
    6161
    62         /*
    63         if (_verbosity >= 2) {
    64             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    65             System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    66         }
    67             */
    68        
    6962        int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS);
    7063        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache();
    71 
     64        json_list_data.setName("JSON-file-list");
     65       
    7266        long num_volumes = json_list_data.count();
    7367        double per_vol = 100.0/(double)num_volumes;
     
    7771        DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
    7872       
    79         //String strict_file_io_str = System.getProperty("wcsa-ef-ingest.strict-file-io","true");
    8073        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
    8174               
     
    8477                                     per_vol_progress_accum,per_vol,
    8578                                     strict_file_io);
    86         JavaRDD<String> words = json_list_data.flatMap(paged_solr_wordfreq_flatmap); // .cache() *****
    87        
     79        JavaRDD<String> words = json_list_data.flatMap(paged_solr_wordfreq_flatmap);
     80        words.setName("tokenized-words");
    8881       
    8982        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    9083            public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }
    9184        });
    92 
     85        pairs.setName("single-word-count");
     86       
    9387        JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
    9488            public Integer call(Integer a, Integer b) { return a + b; }
    9589        });
    96 
    97         //counts.map(lambda (x,y): (y,x));
    98        
    99        
    100         JavaPairRDD<Integer, String> swappedPair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
     90        counts.setName("word-frequency");
     91       
     92        JavaPairRDD<Integer, String> swapped_pair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
    10193               @Override
    10294               public Tuple2<Integer, String> call(Tuple2<String, Integer> item) throws Exception {
     
    10597
    10698            });
    107 
    108         //JavaPairRDD<Integer, String> sorted_swapped_pair = swappedPair.sortByKey(false,num_partitions);
    109         JavaPairRDD<Integer, String> sorted_swapped_pair = swappedPair.sortByKey(false,1);
    110 
     99        swapped_pair.setName("frequency-word-swap");
     100       
     101        JavaPairRDD<Integer, String> sorted_swapped_pair = swapped_pair.sortByKey(false,num_partitions);
     102       
     103        sorted_swapped_pair.setName("descending-sorted-frequency-word");
     104       
    111105        JavaPairRDD<String, Integer> sorted_swaped_back_pair = sorted_swapped_pair.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
    112106               @Override
     
    114108                   return item.swap();
    115109               }
    116 
    117110            });
    118 
    119         /*
    120 
    121         JavaPairRDD<String, Integer> sorted_counts
    122             = counts.map(item -> item.swap()) // interchanges position of entries in each tuple
    123                 .sortByKey(true, 1)         // 1st arg configures ascending sort, 2nd arg configures one task
    124                 .map(item -> item.swap());
    125        
     111        sorted_swaped_back_pair.setName("descending-word-frequency");
     112       
     113/*
     114        JavaPairRDD<Integer, String> counts_swapped_pair
     115            = counts.mapToPair(item -> item.swap());
     116        JavaPairRDD<Integer, String> counts_swapped_pair_sorted
     117            = counts_swapped_pair.sortByKey(true, 1);
     118        JavaPairRDD<String, Integer> count_sorted = counts_swapped_pair_sorted.mapToPair(item -> item.swap());
    126119        */
    127120       
    128121       
    129         //sorted_counts.saveAsTextFile(_json_list_filename + ".out");
    130122        String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$","");
    131123        String output_directory = "whitelist-" + filename_root + "-out";
Note: See TracChangeset for help on using the changeset viewer.