Changeset 31266 for other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main
- Timestamp:
- 2016-12-27T18:51:42+13:00 (7 years ago)
- Location:
- other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerPageJSONFlatmap.java
r31252 r31266 105 105 ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 106 106 } 107 if (_verbosity >= 2) {107 if (_verbosity >= 3) { 108 108 System.out.print(" Pages: "); 109 109 } … … 113 113 String page_id = volume_id + "." + formatted_i; 114 114 115 if (_verbosity >= 2) {115 if (_verbosity >= 3) { 116 116 if (i>0) { 117 117 System.out.print(", "); … … 123 123 124 124 if (i==(ef_page_count-1)) { 125 if (_verbosity >= 2) {125 if (_verbosity >= 3) { 126 126 System.out.println(); 127 127 } 128 System.out.println("Sample output JSON page file: " + output_json_bz2); 128 if (_verbosity >= 2) { 129 System.out.println("Sample output JSON page file: " + output_json_bz2); 130 } 129 131 } 130 132 -
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerVolumeJSON.java
r31252 r31266 2 2 3 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.Iterator; 4 6 7 import org.apache.spark.api.java.function.FlatMapFunction; 5 8 import org.apache.spark.api.java.function.VoidFunction; 6 9 import org.apache.spark.util.DoubleAccumulator; … … 18 21 19 22 20 public class PerVolumeJSON implements VoidFunction<String> 23 //public class PerVolumeJSON implements VoidFunction<String> 24 public class PerVolumeJSON implements FlatMapFunction<String,String> 21 25 { 22 26 private static final long serialVersionUID = 1L; … … 58 62 } 59 63 60 //public Iterator<String> call(String json_file_in) 61 public void call(String json_file_in) throws IOException 64 //public void call(String json_file_in) throws IOException 65 public Iterator<String> call(String json_file_in) throws IOException 66 62 67 { 63 68 if ((_whitelist_filename != null) && (_whitelist_bloomfilter == null)) { … … 65 70 } 66 71 72 ArrayList<String> ids = null; 73 67 74 String full_json_file_in = _input_dir + "/" + json_file_in; 68 75 JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(full_json_file_in); … … 94 101 } 95 102 96 //ArrayList<String>ids = new ArrayList<String>(ef_num_pages);103 ids = new ArrayList<String>(ef_num_pages); 97 104 for (int i = 0; i < ef_page_count; i++) { 98 105 String formatted_i = String.format("page-%06d", i); … … 104 111 105 112 String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; 106 //ids.add(output_json_bz2); // ****113 ids.add(page_id); 107 114 108 115 if (i==0) { … … 164 171 _progress_accum.add(_progress_step); 165 172 166 //return ids.iterator();173 return ids.iterator(); 167 174 } 168 175 } -
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
r31252 r31266 28 28 // http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ 29 29 30 protected static final int DEFAULT_NUM_CORES = 6; 31 protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES; 30 //protected static final int DEFAULT_NUM_CORES = 6; 31 //protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES; 32 33 protected static final int DEFAULT_FILES_PER_PARTITION = 3000; 32 34 33 35 protected String _input_dir; … … 96 98 SparkConf conf = new SparkConf().setAppName(spark_app_name); 97 99 JavaSparkContext jsc = new JavaSparkContext(conf); 98 100 101 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 102 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION); 103 104 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 105 106 long num_volumes = json_list_data.count(); 107 double per_vol = 100.0/(double)num_volumes; 108 109 int num_partitions = (int)(num_volumes/files_per_partition)+1; 110 111 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions); 112 113 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 114 115 System.err.println(); 116 System.err.println(); 117 System.err.println(); 118 System.err.println("****##### _input_dir = " + _input_dir); 119 System.err.println(); 120 System.err.println(); 121 System.err.println(); 122 123 boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize"); 124 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 125 126 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename, 127 _solr_url,_output_dir,_verbosity, progress_accum,per_vol, 128 icu_tokenize,strict_file_io); 129 130 //json_list_data_rp.foreach(per_vol_json); 131 JavaRDD<String> per_page_ids = json_list_data_rp.flatMap(per_vol_json); 132 long num_page_ids = per_page_ids.count(); 133 134 long num_ids = num_volumes; 135 136 System.out.println(""); 137 System.out.println("############"); 138 System.out.println("# Number of page ids: " + num_page_ids); 139 System.out.println("############"); 140 System.out.println(""); 141 142 jsc.close(); 143 } 144 145 146 147 public void execPerPage() 148 { 149 String spark_app_name = generateSparkAppName("Per Page"); 150 151 SparkConf conf = new SparkConf().setAppName(spark_app_name); 152 JavaSparkContext jsc = new JavaSparkContext(conf); 153 154 /* 99 155 if (_verbosity >= 2) { 100 156 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 101 157 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 102 158 } 103 104 int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 105 106 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 159 */ 160 161 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 162 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION); 163 164 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 107 165 108 166 long num_volumes = json_list_data.count(); 109 167 double per_vol = 100.0/(double)num_volumes; 110 168 111 //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100)); 112 113 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 114 115 System.err.println(); 116 System.err.println(); 117 System.err.println(); 118 System.err.println("****##### _input_dir = " + _input_dir); 119 System.err.println(); 120 System.err.println(); 121 System.err.println(); 122 123 boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize"); 124 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 125 126 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename, 127 _solr_url,_output_dir,_verbosity, progress_accum,per_vol, 128 icu_tokenize,strict_file_io); 129 130 json_list_data.foreach(per_vol_json); 131 132 long num_ids = num_volumes; 133 134 System.out.println(""); 135 System.out.println("############"); 136 System.out.println("# Number of volume ids: " + num_ids); 137 System.out.println("############"); 138 System.out.println(""); 139 140 jsc.close(); 141 } 142 143 144 145 public void execPerPage() 146 { 147 String spark_app_name = generateSparkAppName("Per Page"); 148 149 SparkConf conf = new SparkConf().setAppName(spark_app_name); 150 JavaSparkContext jsc = new JavaSparkContext(conf); 151 152 if (_verbosity >= 2) { 153 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 154 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 155 } 156 157 int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 158 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 159 160 long num_volumes = json_list_data.count(); 161 double per_vol = 100.0/(double)num_volumes; 162 163 //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100)); 169 int num_partitions = (int)(num_volumes/files_per_partition)+1; 170 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions); 164 171 165 172 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent"); … … 173 180 per_vol_progress_accum,per_vol, 174 181 icu_tokenize,strict_file_io); 175 JavaRDD<JSONObject> per_page_jsonobjects = json_list_data.flatMap(paged_solr_json_flatmap).cache(); 182 //JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap).cache(); 183 JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap); 176 184 177 185 //long num_page_ids = per_page_jsonobjects.count(); // trigger lazy eval of: flatmap:per-vol
Note:
See TracChangeset
for help on using the changeset viewer.