Changeset 31001 for other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
- Timestamp:
- 2016-10-30T23:51:07+13:00 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
r30998 r31001 7 7 import org.apache.spark.util.DoubleAccumulator; 8 8 import org.hathitrust.extractedfeatures.PagedJSON; 9 import org.json.JSONObject; 9 10 import org.apache.spark.SparkConf; 10 11 … … 41 42 } 42 43 43 public void exec ()44 public void execPerVolume() 44 45 { 45 String spark_app_name = " HathiTrust Extract Features: Preparefor Solr Ingest";46 String spark_app_name = "[Per Volume] Extract Features: Process for Solr Ingest"; 46 47 spark_app_name += " [" + _json_list_filename + "]"; 47 48 … … 61 62 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 62 63 64 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 65 66 json_list_data.foreach(per_vol_json); 67 68 long num_ids = num_volumes; 69 70 System.out.println(""); 71 System.out.println("############"); 72 System.out.println("# Number of volume ids: " + num_ids); 73 System.out.println("############"); 74 System.out.println(""); 75 76 jsc.close(); 77 } 78 79 public void execPerPage() 80 { 81 String spark_app_name = "[Per Page] Extract Features: Process for Solr Ingest"; 82 spark_app_name += " [" + _json_list_filename + "]"; 83 84 SparkConf conf = new SparkConf().setAppName(spark_app_name); 85 JavaSparkContext jsc = new JavaSparkContext(conf); 86 87 if (_verbosity >= 2) { 88 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 89 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 90 } 91 92 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 93 94 long num_volumes = json_list_data.count(); 95 double per_vol = 100.0/(double)num_volumes; 96 97 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 98 63 99 PagedJSON paged_json = new PagedJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 64 //JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 65 66 json_list_data.foreach(paged_json); 67 68 100 JavaRDD<JSONObject> json_per_page_ids = json_list_data.flatMap(paged_json).cache(); 101 102 json_per_page_ids.foreach(paged_json); 103 69 104 /* 70 105 System.out.println(""); … … 75 110 */ 76 111 77 //long num_ids = json_ids.count(); 78 long num_ids = num_volumes; 79 80 System.out.println(""); 81 System.out.println("############"); 82 System.out.println("# Number of page ids: " + num_ids); 112 long num_page_ids = json_per_page_ids.count(); 113 114 System.out.println(""); 115 System.out.println("############"); 116 System.out.println("# Number of page ids: " + num_page_ids); 83 117 System.out.println("############"); 84 118 System.out.println(""); … … 99 133 } 100 134 135 136 137 101 138 public static void print_usage(HelpFormatter formatter, Options options) 102 139 { … … 174 211 ProcessForSolrIngest prep_for_ingest 175 212 = new ProcessForSolrIngest(input_dir,json_list_filename,solr_url,output_dir,verbosity); 176 prep_for_ingest.exec ();213 prep_for_ingest.execPerVolume(); 177 214 } 178 215 }
Note:
See TracChangeset
for help on using the changeset viewer.