Show
Ignore:
Timestamp:
29.10.2016 15:45:38 (3 years ago)
Author:
davidb
Message:

Introduction of Spark accumulator to measure progress. Output of POST read in and status checked for

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30980 r30984  
    1414import org.apache.commons.compress.compressors.CompressorException; 
    1515import org.apache.spark.api.java.function.FlatMapFunction; 
     16import org.apache.spark.util.DoubleAccumulator; 
    1617import org.json.JSONArray; 
    1718import org.json.JSONObject; 
     
    3637    protected int    _verbosity; 
    3738     
    38     public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity) 
     39    DoubleAccumulator _progress_accum; 
     40    double            _progress_step; 
     41     
     42    public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity,  
     43                     DoubleAccumulator progress_accum, double progress_step) 
    3944    { 
    4045        _input_dir  = input_dir; 
     
    4247        _output_dir = output_dir; 
    4348        _verbosity  = verbosity; 
     49         
     50        _progress_accum = progress_accum; 
     51        _progress_step  = progress_step; 
    4452    } 
    4553     
     
    214222             
    215223            // Read response 
    216             BufferedReader in = new BufferedReader(new InputStreamReader( 
    217                     httpcon.getInputStream())); 
     224            StringBuilder sb = new StringBuilder(); 
     225            BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream())); 
    218226            String decodedString; 
    219227            while ((decodedString = in.readLine()) != null) { 
    220                 System.out.println(decodedString); 
     228                //System.out.println(decodedString); 
     229                sb.append(decodedString); 
    221230            } 
    222231            in.close(); 
    223232 
     233            JSONObject solr_status_json = new JSONObject(sb.toString()); 
     234            JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader"); 
     235            if (response_header_json != null) { 
     236                int status = response_header_json.getInt("status"); 
     237                if (status != 0) { 
     238                    System.err.println("Warning: POST request to " + post_url + " returned status " + status); 
     239                    System.err.println("Full response was: " + sb); 
     240                } 
     241            } 
     242            else { 
     243                System.err.println("Failed response to Solr POST: " + sb); 
     244            } 
     245             
     246             
     247             
    224248        } 
    225249        catch (Exception e) { 
     
    341365         
    342366        ids.add(volume_id); 
    343          
     367        _progress_accum.add(_progress_step); 
    344368        return ids.iterator(); 
    345369    } 
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30979 r30984  
    55 
    66import org.apache.spark.api.java.*; 
     7import org.apache.spark.util.DoubleAccumulator; 
    78import org.apache.spark.SparkConf; 
    89 
     
    4647        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 
    4748 
    48         PagedJSON paged_json = new PagedJSON(_input_dir, _solr_url,_output_dir,_verbosity); 
     49        long num_volumes = json_list_data.count(); 
     50        double per_vol = 100.0/(double)num_volumes; 
     51         
     52        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("ProgressPercent"); 
     53 
     54        //sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); 
     55        // ... 
     56        // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 
     57 
     58        //accum.value(); 
     59         
     60        PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 
    4961        JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 
    5062