Ignore:
Timestamp:
2016-11-02T21:34:47+13:00 (7 years ago)
Author:
davidb
Message:

More careful treatment of what to do when a JSON file isn't there

File:
1 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java

    r31041 r31045  
    6565    }
    6666   
    67     public void execPerVolume()
    68     {   
    69         String spark_app_name = generateSparkAppName("Per Volume");     
    70        
    71         SparkConf conf = new SparkConf().setAppName(spark_app_name);
    72         JavaSparkContext jsc = new JavaSparkContext(conf);
    73        
    74         if (_verbosity >= 2) {
    75             System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
    76             System.out.println("Default Parallelism: " + jsc.defaultParallelism());
    77         }
    78                
    79         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
    80 
    81         long num_volumes = json_list_data.count();
    82         double per_vol = 100.0/(double)num_volumes;
    83        
    84         DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
    85        
    86         System.err.println();
    87         System.err.println();
    88         System.err.println();
    89         System.err.println("****##### _input_dir =  " + _input_dir);
    90         System.err.println();
    91         System.err.println();
    92         System.err.println();
    93        
    94         PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
    95 
    96         json_list_data.foreach(per_vol_json);
    97        
    98         long num_ids = num_volumes;
    99        
    100         System.out.println("");
    101         System.out.println("############");
    102         System.out.println("# Number of volume ids: " + num_ids);
    103         System.out.println("############");
    104         System.out.println("");
    105 
    106         jsc.close();
    107     }
    10867    public ArrayList<String> extrapolateSolrEndpoints()
    10968    {
     
    12786    }
    12887   
     88    public void execPerVolume()
     89    {   
     90        String spark_app_name = generateSparkAppName("Per Volume");     
     91       
     92        SparkConf conf = new SparkConf().setAppName(spark_app_name);
     93        JavaSparkContext jsc = new JavaSparkContext(conf);
     94       
     95        if (_verbosity >= 2) {
     96            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());
     97            System.out.println("Default Parallelism: " + jsc.defaultParallelism());
     98        }
     99               
     100        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
     101
     102        long num_volumes = json_list_data.count();
     103        double per_vol = 100.0/(double)num_volumes;
     104       
     105        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");
     106       
     107        System.err.println();
     108        System.err.println();
     109        System.err.println();
     110        System.err.println("****##### _input_dir =  " + _input_dir);
     111        System.err.println();
     112        System.err.println();
     113        System.err.println();
     114       
     115        PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
     116
     117        json_list_data.foreach(per_vol_json);
     118       
     119        long num_ids = num_volumes;
     120       
     121        System.out.println("");
     122        System.out.println("############");
     123        System.out.println("# Number of volume ids: " + num_ids);
     124        System.out.println("############");
     125        System.out.println("");
     126
     127        jsc.close();
     128    }
     129   
     130   
     131   
    129132    public void execPerPage()
    130133    {   
     
    146149        DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent");
    147150       
     151        //String strict_file_io_str = System.getProperty("wcsa-ef-ingest.strict-file-io","true");
     152        boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io");
     153               
    148154        PerPageJSONFlatmap paged_solr_json_flatmap
    149             = new PerPageJSONFlatmap(_input_dir,_solr_url,_output_dir,_verbosity, per_vol_progress_accum,per_vol);
     155            = new PerPageJSONFlatmap(_input_dir,_solr_url,_output_dir,_verbosity,
     156                                     per_vol_progress_accum,per_vol,
     157                                     strict_file_io);
    150158        JavaRDD<JSONObject> per_page_jsonobjects = json_list_data.flatMap(paged_solr_json_flatmap).cache();
    151159       
     
    155163        ArrayList<String> solr_endpoints = extrapolateSolrEndpoints();
    156164       
     165       
    157166        PerPageJSONMap paged_json_id_map
    158             = new PerPageJSONMap(_input_dir,solr_endpoints,_output_dir,_verbosity, per_page_progress_accum,1);
     167            = new PerPageJSONMap(_input_dir,solr_endpoints,_output_dir,_verbosity,
     168                                 per_page_progress_accum,1);
    159169        JavaRDD<String> per_page_ids = per_page_jsonobjects.map(paged_json_id_map);
    160170
Note: See TracChangeset for help on using the changeset viewer.