Changeset 30984 for other-projects/hathitrust
- Timestamp:
- 2016-10-29T15:45:38+13:00 (7 years ago)
- Location:
- other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java
r30980 r30984 14 14 import org.apache.commons.compress.compressors.CompressorException; 15 15 import org.apache.spark.api.java.function.FlatMapFunction; 16 import org.apache.spark.util.DoubleAccumulator; 16 17 import org.json.JSONArray; 17 18 import org.json.JSONObject; … … 36 37 protected int _verbosity; 37 38 38 public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity) 39 DoubleAccumulator _progress_accum; 40 double _progress_step; 41 42 public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity, 43 DoubleAccumulator progress_accum, double progress_step) 39 44 { 40 45 _input_dir = input_dir; … … 42 47 _output_dir = output_dir; 43 48 _verbosity = verbosity; 49 50 _progress_accum = progress_accum; 51 _progress_step = progress_step; 44 52 } 45 53 … … 214 222 215 223 // Read response 216 BufferedReader in = new BufferedReader(new InputStreamReader(217 224 StringBuilder sb = new StringBuilder(); 225 BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream())); 218 226 String decodedString; 219 227 while ((decodedString = in.readLine()) != null) { 220 System.out.println(decodedString); 228 //System.out.println(decodedString); 229 sb.append(decodedString); 221 230 } 222 231 in.close(); 223 232 233 JSONObject solr_status_json = new JSONObject(sb.toString()); 234 JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader"); 235 if (response_header_json != null) { 236 int status = response_header_json.getInt("status"); 237 if (status != 0) { 238 System.err.println("Warning: POST request to " + post_url + " returned status " + status); 239 System.err.println("Full response was: " + sb); 240 } 241 } 242 else { 243 System.err.println("Failed response to Solr POST: " + sb); 244 } 245 246 247 224 248 } 225 249 catch (Exception e) { … … 341 365 342 366 ids.add(volume_id); 343 367 _progress_accum.add(_progress_step); 344 368 return ids.iterator(); 345 369 } -
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java
r30979 r30984 5 5 6 6 import org.apache.spark.api.java.*; 7 import org.apache.spark.util.DoubleAccumulator; 7 8 import org.apache.spark.SparkConf; 8 9 … … 46 47 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 47 48 48 PagedJSON paged_json = new PagedJSON(_input_dir, _solr_url,_output_dir,_verbosity); 49 long num_volumes = json_list_data.count(); 50 double per_vol = 100.0/(double)num_volumes; 51 52 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("ProgressPercent"); 53 54 //sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); 55 // ... 56 // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 57 58 //accum.value(); 59 60 PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 49 61 JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 50 62
Note:
See TracChangeset
for help on using the changeset viewer.