Ignore:
Timestamp:
2016-10-29T16:17:22+13:00 (7 years ago)
Author:
davidb
Message:

Changed to run main processing method as action rather than transform. Done to help accumulator add

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30984 r30985  
    1414import org.apache.commons.compress.compressors.CompressorException;
    1515import org.apache.spark.api.java.function.FlatMapFunction;
     16import org.apache.spark.api.java.function.VoidFunction;
    1617import org.apache.spark.util.DoubleAccumulator;
    1718import org.json.JSONArray;
     
    2829
    2930
    30 class PagedJSON implements FlatMapFunction<String, String>
     31//class PagedJSON implements FlatMapFunction<String, String>
     32class PagedJSON implements VoidFunction<String>
    3133{
    3234    private static final long serialVersionUID = 1L;
     
    226228            String decodedString;
    227229            while ((decodedString = in.readLine()) != null) {
    228                 //System.out.println(decodedString);
    229230                sb.append(decodedString);
    230231            }
     
    252253       
    253254    }
    254     public Iterator<String> call(String json_file_in)
     255   
     256    //public Iterator<String> call(String json_file_in)
     257    public void call(String json_file_in)
    255258    {
    256259        JSONObject extracted_feature_record = readJSONFile(json_file_in);
     
    305308                    System.out.println("Sample output Solr add JSON [page 20]: " + solr_add_doc_json.toString());
    306309                    System.out.println("==================");
    307                     //System.out.println("Sample text [page 20]: " + solr_add_doc_json.getString("_text_"));
    308310                }
    309311               
    310                 // create JSON obj of just the page (for now), and write it out
    311                 // write out the JSONOBject as a bz2 compressed file
    312                 /*
    313                 try {
    314                     BufferedWriter bw = ClusterFileIO.getBufferedWriterForCompressedFile(_output_dir + "/" + output_json_bz2);
    315                     bw.write(ef_page.toString());
    316                     bw.close();
    317                 } catch (IOException e) {
    318                     e.printStackTrace();
    319                 } catch (CompressorException e) {
    320                     e.printStackTrace();
    321                 }
    322                 */
    323312                           
    324313                if (_solr_url != null) {
     
    346335        }
    347336       
    348         /*
    349         for (int i = 0; i < ef_num_pages; i++)
    350         {
    351             //String post_id = ef_pages.getJSONObject(i).getString("post_id");
    352             //......
    353         }
    354         */
    355         //String pageName = json_obj.getJSONObject("pageInfo").getString("pageName");
    356 /*
    357         JSONArray arr = obj.getJSONArray("posts");
    358         for (int i = 0; i < arr.length(); i++)
    359         {
    360             String post_id = arr.getJSONObject(i).getString("post_id");
    361             ......
    362         }
    363 */
    364        
    365337       
    366338        ids.add(volume_id);
    367339        _progress_accum.add(_progress_step);
    368         return ids.iterator();
     340       
     341        //return ids.iterator();
    369342    }
    370343}
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30984 r30985  
    5151       
    5252        DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("ProgressPercent");
    53 
    54         //sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
    55         // ...
    56         // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    57 
    58         //accum.value();
    5953       
    6054        PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);
    61         JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
     55        //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache();
    6256
    63         long num_ids = json_ids.count();
     57        json_list_data.foreach(paged_json);
     58
     59   
     60        //long num_ids = json_ids.count();
     61        long num_ids = num_volumes;
     62       
    6463        System.out.println("");
    6564        System.out.println("############");
     
    6867        System.out.println("");
    6968
     69        /*
    7070        if (_output_dir != null) {
    7171            String rdd_save_file = "rdd-solr-json-page-files";
     
    7777            System.out.println("");
    7878        }
     79        */
    7980       
    8081        jsc.close();
     
    8990
    9091        Options options = new Options();
    91 
    92         //.withType(Integer.class)
    93 /*
    94         options.addOption(OptionBuilder.withLongOpt("verbosity")
    95                 .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]")
    96                 .hasArg()
    97                 .withArgName("v")
    98                 .isRequired(false)
    99                 .create());
    100 */
    101         //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use");
    102         //num_cores_opt.setRequired(false);
    103         //options.addOption(num_cores_opt);
    10492
    10593        Option verbosity_opt = new Option("v", "verbosity", true,
     
    137125            print_usage(formatter,options);
    138126            System.exit(1);
    139             //return;  // prevents 'cmd may not be assigned' compiler error in Eclipse
    140127        }
    141128
    142         //value = ((Integer)cmdLine.getParsedOptionValue("num-cores")).intValue();
    143         //value = ((Integer)cmdLine.getOptionValue("num-cores","2")).intValue();
    144 
    145         //cmd.hasOption("json-filelist")
    146129       
    147130        String verbosity_str = cmd.getOptionValue("verbosity","0");
     
    167150        String input_dir  = filtered_args[0];
    168151        String json_list_filename = filtered_args[1];
    169         //String output_dir = filtered_args[2];
    170 
     152       
    171153        PrepareForIngest prep_for_ingest
    172154            = new PrepareForIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity);
Note: See TracChangeset for help on using the changeset viewer.