Changeset 31045 for other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
- Timestamp:
- 2016-11-02T21:34:47+13:00 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
r31041 r31045 65 65 } 66 66 67 public void execPerVolume()68 {69 String spark_app_name = generateSparkAppName("Per Volume");70 71 SparkConf conf = new SparkConf().setAppName(spark_app_name);72 JavaSparkContext jsc = new JavaSparkContext(conf);73 74 if (_verbosity >= 2) {75 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());76 System.out.println("Default Parallelism: " + jsc.defaultParallelism());77 }78 79 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();80 81 long num_volumes = json_list_data.count();82 double per_vol = 100.0/(double)num_volumes;83 84 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");85 86 System.err.println();87 System.err.println();88 System.err.println();89 System.err.println("****##### _input_dir = " + _input_dir);90 System.err.println();91 System.err.println();92 System.err.println();93 94 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);95 96 json_list_data.foreach(per_vol_json);97 98 long num_ids = num_volumes;99 100 System.out.println("");101 System.out.println("############");102 System.out.println("# Number of volume ids: " + num_ids);103 System.out.println("############");104 System.out.println("");105 106 jsc.close();107 }108 67 public ArrayList<String> extrapolateSolrEndpoints() 109 68 { … … 127 86 } 128 87 88 public void execPerVolume() 89 { 90 String spark_app_name = generateSparkAppName("Per Volume"); 91 92 SparkConf conf = new SparkConf().setAppName(spark_app_name); 93 JavaSparkContext jsc = new JavaSparkContext(conf); 94 95 if (_verbosity >= 2) { 96 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 97 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 98 } 99 100 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 101 102 long num_volumes = json_list_data.count(); 103 double per_vol = 100.0/(double)num_volumes; 104 105 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 106 107 System.err.println(); 108 System.err.println(); 109 System.err.println(); 110 System.err.println("****##### _input_dir = " + _input_dir); 111 System.err.println(); 112 System.err.println(); 113 System.err.println(); 114 115 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 116 117 json_list_data.foreach(per_vol_json); 118 119 long num_ids = num_volumes; 120 121 System.out.println(""); 122 System.out.println("############"); 123 System.out.println("# Number of volume ids: " + num_ids); 124 System.out.println("############"); 125 System.out.println(""); 126 127 jsc.close(); 128 } 129 130 131 129 132 public void execPerPage() 130 133 { … … 146 149 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent"); 147 150 151 //String strict_file_io_str = System.getProperty("wcsa-ef-ingest.strict-file-io","true"); 152 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 153 148 154 PerPageJSONFlatmap paged_solr_json_flatmap 149 = new PerPageJSONFlatmap(_input_dir,_solr_url,_output_dir,_verbosity, per_vol_progress_accum,per_vol); 155 = new PerPageJSONFlatmap(_input_dir,_solr_url,_output_dir,_verbosity, 156 per_vol_progress_accum,per_vol, 157 strict_file_io); 150 158 JavaRDD<JSONObject> per_page_jsonobjects = json_list_data.flatMap(paged_solr_json_flatmap).cache(); 151 159 … … 155 163 ArrayList<String> solr_endpoints = extrapolateSolrEndpoints(); 156 164 165 157 166 PerPageJSONMap paged_json_id_map 158 = new PerPageJSONMap(_input_dir,solr_endpoints,_output_dir,_verbosity, per_page_progress_accum,1); 167 = new PerPageJSONMap(_input_dir,solr_endpoints,_output_dir,_verbosity, 168 per_page_progress_accum,1); 159 169 JavaRDD<String> per_page_ids = per_page_jsonobjects.map(paged_json_id_map); 160 170
Note:
See TracChangeset
for help on using the changeset viewer.