Changeset 31266 for other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
- Timestamp:
- 2016-12-27T18:51:42+13:00 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
r31252 r31266 28 28 // http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ 29 29 30 protected static final int DEFAULT_NUM_CORES = 6; 31 protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES; 30 //protected static final int DEFAULT_NUM_CORES = 6; 31 //protected static final int DEFAULT_NUM_PARTITIONS = 3*DEFAULT_NUM_CORES; 32 33 protected static final int DEFAULT_FILES_PER_PARTITION = 3000; 32 34 33 35 protected String _input_dir; … … 96 98 SparkConf conf = new SparkConf().setAppName(spark_app_name); 97 99 JavaSparkContext jsc = new JavaSparkContext(conf); 98 100 101 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 102 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION); 103 104 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 105 106 long num_volumes = json_list_data.count(); 107 double per_vol = 100.0/(double)num_volumes; 108 109 int num_partitions = (int)(num_volumes/files_per_partition)+1; 110 111 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions); 112 113 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 114 115 System.err.println(); 116 System.err.println(); 117 System.err.println(); 118 System.err.println("****##### _input_dir = " + _input_dir); 119 System.err.println(); 120 System.err.println(); 121 System.err.println(); 122 123 boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize"); 124 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 125 126 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename, 127 _solr_url,_output_dir,_verbosity, progress_accum,per_vol, 128 icu_tokenize,strict_file_io); 129 130 //json_list_data_rp.foreach(per_vol_json); 131 JavaRDD<String> per_page_ids = json_list_data_rp.flatMap(per_vol_json); 132 long num_page_ids = per_page_ids.count(); 133 134 long num_ids = num_volumes; 135 136 System.out.println(""); 137 System.out.println("############"); 138 System.out.println("# Number of page ids: " + num_page_ids); 139 System.out.println("############"); 140 System.out.println(""); 141 142 jsc.close(); 143 } 144 145 146 147 public void execPerPage() 148 { 149 String spark_app_name = generateSparkAppName("Per Page"); 150 151 SparkConf conf = new SparkConf().setAppName(spark_app_name); 152 JavaSparkContext jsc = new JavaSparkContext(conf); 153 154 /* 99 155 if (_verbosity >= 2) { 100 156 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 101 157 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 102 158 } 103 104 int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 105 106 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 159 */ 160 161 //int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 162 int files_per_partition = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_FILES_PER_PARTITION); 163 164 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 107 165 108 166 long num_volumes = json_list_data.count(); 109 167 double per_vol = 100.0/(double)num_volumes; 110 168 111 //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100)); 112 113 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 114 115 System.err.println(); 116 System.err.println(); 117 System.err.println(); 118 System.err.println("****##### _input_dir = " + _input_dir); 119 System.err.println(); 120 System.err.println(); 121 System.err.println(); 122 123 boolean icu_tokenize = Boolean.getBoolean("wcsa-ef-ingest.icu-tokenize"); 124 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 125 126 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_whitelist_filename, 127 _solr_url,_output_dir,_verbosity, progress_accum,per_vol, 128 icu_tokenize,strict_file_io); 129 130 json_list_data.foreach(per_vol_json); 131 132 long num_ids = num_volumes; 133 134 System.out.println(""); 135 System.out.println("############"); 136 System.out.println("# Number of volume ids: " + num_ids); 137 System.out.println("############"); 138 System.out.println(""); 139 140 jsc.close(); 141 } 142 143 144 145 public void execPerPage() 146 { 147 String spark_app_name = generateSparkAppName("Per Page"); 148 149 SparkConf conf = new SparkConf().setAppName(spark_app_name); 150 JavaSparkContext jsc = new JavaSparkContext(conf); 151 152 if (_verbosity >= 2) { 153 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 154 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 155 } 156 157 int num_partitions = Integer.getInteger("wcsa-ef-ingest.num-partitions", DEFAULT_NUM_PARTITIONS); 158 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,num_partitions).cache(); 159 160 long num_volumes = json_list_data.count(); 161 double per_vol = 100.0/(double)num_volumes; 162 163 //JavaRDD<String> json_list_data_rp = json_list_data.repartition((int)(num_volumes/100)); 169 int num_partitions = (int)(num_volumes/files_per_partition)+1; 170 JavaRDD<String> json_list_data_rp = json_list_data.repartition(num_partitions); 164 171 165 172 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent"); … … 173 180 per_vol_progress_accum,per_vol, 174 181 icu_tokenize,strict_file_io); 175 JavaRDD<JSONObject> per_page_jsonobjects = json_list_data.flatMap(paged_solr_json_flatmap).cache(); 182 //JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap).cache(); 183 JavaRDD<JSONObject> per_page_jsonobjects = json_list_data_rp.flatMap(paged_solr_json_flatmap); 176 184 177 185 //long num_page_ids = per_page_jsonobjects.count(); // trigger lazy eval of: flatmap:per-vol
Note:
See TracChangeset
for help on using the changeset viewer.