Ignore:
Timestamp:
10/29/16 15:45:38 (4 years ago)
Author:
davidb
Message:

Introduction of Spark accumulator to measure progress. Output of POST read in and status checked for

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30980 r30984  
    1414import org.apache.commons.compress.compressors.CompressorException;
    1515import org.apache.spark.api.java.function.FlatMapFunction;
     16import org.apache.spark.util.DoubleAccumulator;
    1617import org.json.JSONArray;
    1718import org.json.JSONObject;
     
    3637    protected int    _verbosity;
    3738   
    38     public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity)
     39    DoubleAccumulator _progress_accum;
     40    double            _progress_step;
     41   
     42    public PagedJSON(String input_dir, String solr_url, String output_dir, int verbosity,
     43                     DoubleAccumulator progress_accum, double progress_step)
    3944    {
    4045        _input_dir  = input_dir;
     
    4247        _output_dir = output_dir;
    4348        _verbosity  = verbosity;
     49       
     50        _progress_accum = progress_accum;
     51        _progress_step  = progress_step;
    4452    }
    4553   
     
    214222           
    215223            // Read response
    216             BufferedReader in = new BufferedReader(new InputStreamReader(
    217                     httpcon.getInputStream()));
     224            StringBuilder sb = new StringBuilder();
     225            BufferedReader in = new BufferedReader(new InputStreamReader(httpcon.getInputStream()));
    218226            String decodedString;
    219227            while ((decodedString = in.readLine()) != null) {
    220                 System.out.println(decodedString);
     228                //System.out.println(decodedString);
     229                sb.append(decodedString);
    221230            }
    222231            in.close();
    223232
     233            JSONObject solr_status_json = new JSONObject(sb.toString());
     234            JSONObject response_header_json = solr_status_json.getJSONObject("responseHeader");
     235            if (response_header_json != null) {
     236                int status = response_header_json.getInt("status");
     237                if (status != 0) {
     238                    System.err.println("Warning: POST request to " + post_url + " returned status " + status);
     239                    System.err.println("Full response was: " + sb);
     240                }
     241            }
     242            else {
     243                System.err.println("Failed response to Solr POST: " + sb);
     244            }
     245           
     246           
     247           
    224248        }
    225249        catch (Exception e) {
     
    341365       
    342366        ids.add(volume_id);
    343        
     367        _progress_accum.add(_progress_step);
    344368        return ids.iterator();
    345369    }
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30979 r30984  
    55
    66import org.apache.spark.api.java.*;
     7import org.apache.spark.util.DoubleAccumulator;
    78import org.apache.spark.SparkConf;
    89
     
    4647        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();
    4748
    48         PagedJSON paged_json = new PagedJSON(_input_dir, _solr_url,_output_dir,_verbosity);
     49        long num_volumes = json_list_data.count();
     50        double per_vol = 100.0/(double)num_volumes;
     51       
     52        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("ProgressPercent");
     53
     54        //sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
     55        // ...
     56        // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
     57
     58        //accum.value();
     59       
     60        PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
    4961        JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
    5062
Note: See TracChangeset for help on using the changeset viewer.