Ignore:
Timestamp:
2016-10-30T23:51:07+13:00 (7 years ago)
Author:
davidb
Message:

Code to work per-volume and per-page

File:
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r30998 r31001  
    77import org.apache.spark.util.DoubleAccumulator;
    88import org.hathitrust.extractedfeatures.PagedJSON;
     9import org.json.JSONObject;
    910import org.apache.spark.SparkConf;
    1011
     
    4142    }
    4243
    43     public void exec()
     44    public void execPerVolume()
    4445    {   
    45         String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest";
     46        String spark_app_name = "[Per Volume] Extract Features: Process for Solr Ingest";
    4647        spark_app_name += " [" + _json_list_filename + "]";
    4748
     
    6162        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
    6263       
     64        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
     65
     66        json_list_data.foreach(per_vol_json);
     67       
     68        long num_ids = num_volumes;
     69       
     70        System.out.println("");
     71        System.out.println("############");
     72        System.out.println("# Number of volume ids: " + num_ids);
     73        System.out.println("############");
     74        System.out.println("");
     75
     76        jsc.close();
     77    }
     78
     79    public void execPerPage()
     80    {   
     81        String spark_app_name = "[Per Page] Extract Features: Process for Solr Ingest";
     82        spark_app_name += " [" + _json_list_filename + "]";
     83
     84        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     85        JavaSparkContext jsc = new JavaSparkContext(conf);
     86       
     87        if (_verbosity >= 2) {
     88            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
     89            System.out.println("Default Parallelism: " + jsc.defaultParallelism());
     90        }
     91               
     92        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
     93
     94        long num_volumes = json_list_data.count();
     95        double per_vol = 100.0/(double)num_volumes;
     96       
     97        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
     98       
    6399        PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
    64         //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
    65 
    66         json_list_data.foreach(paged_json);
    67 
    68    
     100        JavaRDD<JSONObject> json_per_page_ids = json_list_data.flatMap(paged_json).cache();
     101       
     102        json_per_page_ids.foreach(paged_json);
     103
    69104/*
    70105        System.out.println("");
     
    75110*/
    76111       
    77         //long num_ids = json_ids.count();
    78         long num_ids = num_volumes;
    79        
    80         System.out.println("");
    81         System.out.println("############");
    82         System.out.println("# Number of page ids: " + num_ids);
     112        long num_page_ids = json_per_page_ids.count();
     113       
     114        System.out.println("");
     115        System.out.println("############");
     116        System.out.println("# Number of page ids: " + num_page_ids);
    83117        System.out.println("############");
    84118        System.out.println("");
     
    99133    }
    100134
     135   
     136   
     137
    101138    public static void print_usage(HelpFormatter formatter, Options options)
    102139    {
     
    174211        ProcessForSolrIngest prep_for_ingest
    175212            = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
    176         prep_for_ingest.exec();
     213        prep_for_ingest.execPerVolume();
    177214    }
    178215}
Note: See TracChangeset for help on using the changeset viewer.