Changeset 31251

Show
Ignore:
Timestamp:
19.12.2016 15:13:52 (3 years ago)
Author:
davidb
Message:

Code tidy up. Timed experiment showed sorting by key with num_partitions significantly faster on 1000 slice

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForWhitelist.java

    r31250 r31251  
    6060        JavaSparkContext jsc = new JavaSparkContext(conf); 
    6161 
    62         /* 
    63         if (_verbosity >= 2) { 
    64             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 
    65             System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 
    66         } 
    67             */ 
    68          
    6962        int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 
    7063        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 
    71  
     64        json_list_data.setName("JSON-file-list"); 
     65         
    7266        long num_volumes = json_list_data.count(); 
    7367        double per_vol = 100.0/(double)num_volumes; 
     
    7771        DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent"); 
    7872         
    79         //String strict_file_io_str = System.getProperty("wcsa-ef-ingest.strict-file-io","true"); 
    8073        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 
    8174                 
     
    8477                                     per_vol_progress_accum,per_vol, 
    8578                                     strict_file_io); 
    86         JavaRDD<String> words = json_list_data.flatMap(paged_solr_wordfreq_flatmap); // .cache() ***** 
    87          
     79        JavaRDD<String> words = json_list_data.flatMap(paged_solr_wordfreq_flatmap);  
     80        words.setName("tokenized-words"); 
    8881         
    8982        JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 
    9083            public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } 
    9184        }); 
    92  
     85        pairs.setName("single-word-count"); 
     86         
    9387        JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { 
    9488            public Integer call(Integer a, Integer b) { return a + b; } 
    9589        }); 
    96  
    97         //counts.map(lambda (x,y): (y,x)); 
    98          
    99          
    100         JavaPairRDD<Integer, String> swappedPair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { 
     90        counts.setName("word-frequency"); 
     91         
     92        JavaPairRDD<Integer, String> swapped_pair = counts.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { 
    10193               @Override 
    10294               public Tuple2<Integer, String> call(Tuple2<String, Integer> item) throws Exception { 
     
    10597 
    10698            }); 
    107  
    108         //JavaPairRDD<Integer, String> sorted_swapped_pair = swappedPair.sortByKey(false,num_partitions); 
    109         JavaPairRDD<Integer, String> sorted_swapped_pair = swappedPair.sortByKey(false,1); 
    110  
     99        swapped_pair.setName("frequency-word-swap"); 
     100         
     101        JavaPairRDD<Integer, String> sorted_swapped_pair = swapped_pair.sortByKey(false,num_partitions); 
     102         
     103        sorted_swapped_pair.setName("descending-sorted-frequency-word"); 
     104         
    111105        JavaPairRDD<String, Integer> sorted_swaped_back_pair = sorted_swapped_pair.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { 
    112106               @Override 
     
    114108                   return item.swap(); 
    115109               } 
    116  
    117110            }); 
    118  
    119         /* 
    120  
    121         JavaPairRDD<String, Integer> sorted_counts  
    122             = counts.map(item -> item.swap()) // interchanges position of entries in each tuple 
    123                 .sortByKey(true, 1)         // 1st arg configures ascending sort, 2nd arg configures one task 
    124                 .map(item -> item.swap()); 
    125          
     111        sorted_swaped_back_pair.setName("descending-word-frequency"); 
     112         
     113/* 
     114        JavaPairRDD<Integer, String> counts_swapped_pair  
     115            = counts.mapToPair(item -> item.swap()); 
     116        JavaPairRDD<Integer, String> counts_swapped_pair_sorted  
     117            = counts_swapped_pair.sortByKey(true, 1); 
     118        JavaPairRDD<String, Integer> count_sorted = counts_swapped_pair_sorted.mapToPair(item -> item.swap()); 
    126119        */ 
    127120         
    128121         
    129         //sorted_counts.saveAsTextFile(_json_list_filename + ".out"); 
    130122        String filename_root = _json_list_filename.replaceAll(".*/","").replaceAll("\\..*$",""); 
    131123        String output_directory = "whitelist-" + filename_root + "-out";