- Timestamp:
- 2016-11-02T21:34:47+13:00 (7 years ago)
- Location:
- other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/JSONClusterFileIO.java
r30996 r31045 9 9 protected static JSONObject readJSONFile(String filename) 10 10 { 11 StringBuilder sb = new StringBuilder();12 11 JSONObject json_obj = null; 12 13 13 try { 14 14 StringBuilder sb = new StringBuilder(); 15 15 16 String str; 16 17 BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(filename); … … 20 21 21 22 br.close(); 23 24 json_obj = new JSONObject(sb.toString()); 22 25 } 23 26 catch (Exception e) { 24 27 e.printStackTrace(); 25 28 } 26 27 JSONObject json_obj = new JSONObject(sb.toString());28 29 29 30 30 return json_obj; -
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerPageJSONFlatmap.java
r31030 r31045 1 1 package org.hathitrust.extractedfeatures; 2 2 3 import java.io.IOException; 3 4 import java.util.ArrayList; 4 5 import java.util.Iterator; … … 31 32 protected double _progress_step; 32 33 34 boolean _strict_file_io; 35 33 36 public PerPageJSONFlatmap(String input_dir, String solr_url, String output_dir, int verbosity, 34 DoubleAccumulator progress_accum, double progress_step) 37 DoubleAccumulator progress_accum, double progress_step, 38 boolean strict_file_io) 35 39 { 36 40 _input_dir = input_dir; … … 41 45 _progress_accum = progress_accum; 42 46 _progress_step = progress_step; 47 48 _strict_file_io = strict_file_io; 43 49 } 44 50 45 public Iterator<JSONObject> call(String json_file_in) 51 public Iterator<JSONObject> call(String json_file_in) throws IOException 46 52 //public void call(String json_file_in) 47 53 { 48 JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(_input_dir + "/" + json_file_in); 54 String full_json_file_in = _input_dir + "/" + json_file_in; 55 JSONObject extracted_feature_record = JSONClusterFileIO.readJSONFile(full_json_file_in); 49 56 50 String volume_id = extracted_feature_record.getString("id");57 ArrayList<JSONObject> json_pages = new ArrayList<JSONObject>(); 51 58 52 //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); 53 //String title= ef_metadata.getString("title"); 54 55 JSONObject ef_features = extracted_feature_record.getJSONObject("features"); 56 57 58 int ef_page_count = ef_features.getInt("pageCount"); 59 60 if (_verbosity >= 1) { 61 System.out.println("Processing: " + json_file_in); 62 System.out.println(" pageCount = " + ef_page_count); 59 if (extracted_feature_record != null) { 60 String volume_id = extracted_feature_record.getString("id"); 61 62 //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); 63 //String title= ef_metadata.getString("title"); 64 65 JSONObject ef_features = extracted_feature_record.getJSONObject("features"); 66 67 68 int ef_page_count = ef_features.getInt("pageCount"); 69 70 if (_verbosity >= 1) { 71 System.out.println("Processing: " + json_file_in); 72 System.out.println(" pageCount = " + ef_page_count); 73 } 74 75 JSONArray ef_pages = ef_features.getJSONArray("pages"); 76 int ef_num_pages = ef_pages.length(); 77 if (ef_num_pages != ef_page_count) { 78 System.err.println("Warning: number of page elements in JSON (" + ef_num_pages + ")" 79 +" does not match 'pageCount' metadata (" + ef_page_count + ")"); 80 } 81 82 // Make directory for page-level JSON output 83 String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2"); 84 String page_json_dir = json_dir + "/pages"; 85 ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 86 87 if (_verbosity >= 2) { 88 System.out.print(" Pages: "); 89 } 90 91 for (int i = 0; i < ef_page_count; i++) { 92 String formatted_i = String.format("page-%06d", i); 93 String page_id = volume_id + "." + formatted_i; 94 95 if (_verbosity >= 2) { 96 if (i>0) { 97 System.out.print(", "); 98 } 99 System.out.print(page_id); 100 } 101 102 String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; 103 104 if (i==(ef_page_count-1)) { 105 if (_verbosity >= 2) { 106 System.out.println(); 107 } 108 System.out.println("Sample output JSON page file: " + output_json_bz2); 109 } 110 111 JSONObject ef_page = ef_pages.getJSONObject(i); 112 113 if (ef_page != null) { 114 // Convert to Solr add form 115 JSONObject solr_add_doc_json = SolrDocJSON.generateSolrDocJSON(volume_id, page_id, ef_page); 116 solr_add_doc_json.put("filename_json_bz2", output_json_bz2); 117 118 json_pages.add(solr_add_doc_json); 119 120 121 } 122 else { 123 System.err.println("Skipping: " + page_id); 124 } 125 126 } 63 127 } 64 65 JSONArray ef_pages = ef_features.getJSONArray("pages"); 66 int ef_num_pages = ef_pages.length(); 67 68 // Make directory for page-level JSON output 69 String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2"); 70 String page_json_dir = json_dir + "/pages"; 71 ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 72 73 if (_verbosity >= 2) { 74 System.out.print(" Pages: "); 75 } 76 77 ArrayList<JSONObject> json_pages = new ArrayList<JSONObject>(ef_num_pages); 78 for (int i = 0; i < ef_page_count; i++) { 79 String formatted_i = String.format("page-%06d", i); 80 String page_id = volume_id + "." + formatted_i; 81 82 if (_verbosity >= 2) { 83 if (i>0) { 84 System.out.print(", "); 85 } 86 System.out.print(page_id); 87 } 88 89 String output_json_bz2 = page_json_dir +"/" + formatted_i + ".json.bz2"; 90 //ids.add(output_json_bz2); 91 92 if (i==(ef_page_count-1)) { 93 if (_verbosity >= 2) { 94 System.out.println(); 95 } 96 System.out.println("Sample output JSON page file: " + output_json_bz2); 97 } 98 99 JSONObject ef_page = ef_pages.getJSONObject(i); 100 101 if (ef_page != null) { 102 // Convert to Solr add form 103 JSONObject solr_add_doc_json = SolrDocJSON.generateSolrDocJSON(volume_id, page_id, ef_page); 104 solr_add_doc_json.put("filename_json_bz2", output_json_bz2); 105 106 json_pages.add(solr_add_doc_json); 107 108 128 else { 129 // File did not exist, or could not be parsed 130 String mess = "Failed to read in bzipped JSON file '" + full_json_file_in + "'"; 131 if (_strict_file_io) { 132 throw new IOException(mess); 109 133 } 110 134 else { 111 System.err.println("Skipping: " + page_id); 135 System.err.println("Warning: " + mess); 136 System.out.println("Warning: " + mess); 112 137 } 113 114 138 } 115 116 117 //ids.add(volume_id); 139 118 140 _progress_accum.add(_progress_step); 119 141 120 //return ids.iterator();121 142 return json_pages.iterator(); 122 143 } -
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/PerPageJSONMap.java
r31028 r31045 24 24 protected long _progress_step; 25 25 26 27 26 public PerPageJSONMap(String input_dir, ArrayList<String> solr_endpoints, String output_dir, int verbosity, 28 27 LongAccumulator progress_accum, long progress_step) … … 37 36 _progress_accum = progress_accum; 38 37 _progress_step = progress_step; 38 39 39 } 40 40 -
other-projects/hathitrust/wcsa/extracted-features-solr/trunk/solr-ingest/src/main/java/org/hathitrust/extractedfeatures/ProcessForSolrIngest.java
r31041 r31045 65 65 } 66 66 67 public void execPerVolume()68 {69 String spark_app_name = generateSparkAppName("Per Volume");70 71 SparkConf conf = new SparkConf().setAppName(spark_app_name);72 JavaSparkContext jsc = new JavaSparkContext(conf);73 74 if (_verbosity >= 2) {75 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions());76 System.out.println("Default Parallelism: " + jsc.defaultParallelism());77 }78 79 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache();80 81 long num_volumes = json_list_data.count();82 double per_vol = 100.0/(double)num_volumes;83 84 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent");85 86 System.err.println();87 System.err.println();88 System.err.println();89 System.err.println("****##### _input_dir = " + _input_dir);90 System.err.println();91 System.err.println();92 System.err.println();93 94 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol);95 96 json_list_data.foreach(per_vol_json);97 98 long num_ids = num_volumes;99 100 System.out.println("");101 System.out.println("############");102 System.out.println("# Number of volume ids: " + num_ids);103 System.out.println("############");104 System.out.println("");105 106 jsc.close();107 }108 67 public ArrayList<String> extrapolateSolrEndpoints() 109 68 { … … 127 86 } 128 87 88 public void execPerVolume() 89 { 90 String spark_app_name = generateSparkAppName("Per Volume"); 91 92 SparkConf conf = new SparkConf().setAppName(spark_app_name); 93 JavaSparkContext jsc = new JavaSparkContext(conf); 94 95 if (_verbosity >= 2) { 96 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 97 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 98 } 99 100 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 101 102 long num_volumes = json_list_data.count(); 103 double per_vol = 100.0/(double)num_volumes; 104 105 DoubleAccumulator progress_accum = jsc.sc().doubleAccumulator("Progress Percent"); 106 107 System.err.println(); 108 System.err.println(); 109 System.err.println(); 110 System.err.println("****##### _input_dir = " + _input_dir); 111 System.err.println(); 112 System.err.println(); 113 System.err.println(); 114 115 PerVolumeJSON per_vol_json = new PerVolumeJSON(_input_dir,_solr_url,_output_dir,_verbosity, progress_accum,per_vol); 116 117 json_list_data.foreach(per_vol_json); 118 119 long num_ids = num_volumes; 120 121 System.out.println(""); 122 System.out.println("############"); 123 System.out.println("# Number of volume ids: " + num_ids); 124 System.out.println("############"); 125 System.out.println(""); 126 127 jsc.close(); 128 } 129 130 131 129 132 public void execPerPage() 130 133 { … … 146 149 DoubleAccumulator per_vol_progress_accum = jsc.sc().doubleAccumulator("Per Volume Progress Percent"); 147 150 151 //String strict_file_io_str = System.getProperty("wcsa-ef-ingest.strict-file-io","true"); 152 boolean strict_file_io = Boolean.getBoolean("wcsa-ef-ingest.strict-file-io"); 153 148 154 PerPageJSONFlatmap paged_solr_json_flatmap 149 = new PerPageJSONFlatmap(_input_dir,_solr_url,_output_dir,_verbosity, per_vol_progress_accum,per_vol); 155 = new PerPageJSONFlatmap(_input_dir,_solr_url,_output_dir,_verbosity, 156 per_vol_progress_accum,per_vol, 157 strict_file_io); 150 158 JavaRDD<JSONObject> per_page_jsonobjects = json_list_data.flatMap(paged_solr_json_flatmap).cache(); 151 159 … … 155 163 ArrayList<String> solr_endpoints = extrapolateSolrEndpoints(); 156 164 165 157 166 PerPageJSONMap paged_json_id_map 158 = new PerPageJSONMap(_input_dir,solr_endpoints,_output_dir,_verbosity, per_page_progress_accum,1); 167 = new PerPageJSONMap(_input_dir,solr_endpoints,_output_dir,_verbosity, 168 per_page_progress_accum,1); 159 169 JavaRDD<String> per_page_ids = per_page_jsonobjects.map(paged_json_id_map); 160 170
Note:
See TracChangeset
for help on using the changeset viewer.