Changeset 30985 for other-projects

Show
Ignore:
Timestamp:
29.10.2016 16:17:22 (3 years ago)
Author:
davidb
Message:

Changed to run main processing method as action rather than transform. Done to help accumulator add

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30984 r30985  
    1414import org.apache.commons.compress.compressors.CompressorException; 
    1515import org.apache.spark.api.java.function.FlatMapFunction; 
     16import org.apache.spark.api.java.function.VoidFunction; 
    1617import org.apache.spark.util.DoubleAccumulator; 
    1718import org.json.JSONArray; 
     
    2829 
    2930 
    30 class PagedJSON implements FlatMapFunction<String, String>  
     31//class PagedJSON implements FlatMapFunction<String, String>  
     32class PagedJSON implements VoidFunction<String>  
    3133{ 
    3234    private static final long serialVersionUID = 1L; 
     
    226228            String decodedString; 
    227229            while ((decodedString = in.readLine()) != null) { 
    228                 //System.out.println(decodedString); 
    229230                sb.append(decodedString); 
    230231            } 
     
    252253         
    253254    } 
    254     public Iterator<String> call(String json_file_in)  
     255     
     256    //public Iterator<String> call(String json_file_in)  
     257    public void call(String json_file_in)  
    255258    {  
    256259        JSONObject extracted_feature_record = readJSONFile(json_file_in); 
     
    305308                    System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString()); 
    306309                    System.out.println("=================="); 
    307                     //System.out.println("Sample text [page 20]: " + solr_add_doc_json.getString("_text_")); 
    308310                } 
    309311                 
    310                 // create JSON obj of just the page (for now), and write it out 
    311                 // write out the JSONOBject as a bz2 compressed file 
    312                 /* 
    313                 try { 
    314                     BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_json_bz2); 
    315                     bw.write(ef_page.toString()); 
    316                     bw.close(); 
    317                 } catch (IOException e) { 
    318                     e.printStackTrace(); 
    319                 } catch (CompressorException e) { 
    320                     e.printStackTrace(); 
    321                 } 
    322                 */ 
    323312                             
    324313                if (_solr_url != null) { 
     
    346335        } 
    347336         
    348         /* 
    349         for (int i = 0; i < ef_num_pages; i++) 
    350         { 
    351             //String post_id = ef_pages.getJSONObject(i).getString("post_id"); 
    352             //...... 
    353         } 
    354         */ 
    355         //String pageName = json_obj.getJSONObject("pageInfo").getString("pageName"); 
    356 /* 
    357         JSONArray arr = obj.getJSONArray("posts"); 
    358         for (int i = 0; i < arr.length(); i++) 
    359         { 
    360             String post_id = arr.getJSONObject(i).getString("post_id"); 
    361             ...... 
    362         } 
    363 */ 
    364          
    365337         
    366338        ids.add(volume_id); 
    367339        _progress_accum.add(_progress_step); 
    368         return ids.iterator(); 
     340         
     341        //return ids.iterator(); 
    369342    } 
    370343} 
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30984 r30985  
    5151         
    5252        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("ProgressPercent"); 
    53  
    54         //sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x)); 
    55         // ... 
    56         // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 
    57  
    58         //accum.value(); 
    5953         
    6054        PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 
    61         JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 
     55        //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 
    6256 
    63         long num_ids = json_ids.count(); 
     57        json_list_data.foreach(paged_json); 
     58 
     59     
     60        //long num_ids = json_ids.count(); 
     61        long num_ids = num_volumes; 
     62         
    6463        System.out.println(""); 
    6564        System.out.println("############"); 
     
    6867        System.out.println(""); 
    6968 
     69        /* 
    7070        if (_output_dir != null) { 
    7171            String rdd_save_file = "rdd-solr-json-page-files"; 
     
    7777            System.out.println(""); 
    7878        } 
     79        */ 
    7980         
    8081        jsc.close(); 
     
    8990 
    9091        Options options = new Options(); 
    91  
    92         //.withType(Integer.class) 
    93 /* 
    94         options.addOption(OptionBuilder.withLongOpt("verbosity") 
    95                 .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]") 
    96                 .hasArg() 
    97                 .withArgName("v") 
    98                 .isRequired(false) 
    99                 .create()); 
    100 */ 
    101         //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use"); 
    102         //num_cores_opt.setRequired(false); 
    103         //options.addOption(num_cores_opt); 
    10492 
    10593        Option verbosity_opt = new Option("v", "verbosity", true,  
     
    137125            print_usage(formatter,options); 
    138126            System.exit(1); 
    139             //return;  // prevents 'cmd may not be assigned' compiler error in Eclipse 
    140127        } 
    141128 
    142         //value = ((Integer)cmdLine.getParsedOptionValue("num-cores")).intValue(); 
    143         //value = ((Integer)cmdLine.getOptionValue("num-cores","2")).intValue(); 
    144  
    145         //cmd.hasOption("json-filelist") 
    146129         
    147130        String verbosity_str = cmd.getOptionValue("verbosity","0"); 
     
    167150        String input_dir  = filtered_args[0]; 
    168151        String json_list_filename = filtered_args[1]; 
    169         //String output_dir = filtered_args[2]; 
    170  
     152         
    171153        PrepareForIngest prep_for_ingest  
    172154            = new PrepareForIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);