Changeset 30937 for other-projects/hathitrust
- Timestamp:
- 2016-10-26T13:44:38+13:00 (7 years ago)
- Location:
- other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java
r30933 r30937 1 1 package org.hathitrust; 2 2 3 import java.io.BufferedInputStream;4 3 import java.io.BufferedReader; 5 import java.io.FileInputStream;6 import java.io.IOException;7 import java.io.InputStreamReader;8 import java.net.URI;9 4 import java.util.ArrayList; 10 5 import java.util.Iterator; 11 6 12 import org.apache.commons.compress.compressors.CompressorException;13 import org.apache.commons.compress.compressors.CompressorInputStream;14 import org.apache.commons.compress.compressors.CompressorStreamFactory;15 import org.apache.hadoop.conf.Configuration;16 import org.apache.hadoop.fs.FSDataInputStream;17 import org.apache.hadoop.fs.FileSystem;18 import org.apache.hadoop.fs.Path;19 7 import org.apache.spark.api.java.function.FlatMapFunction; 20 8 import org.json.JSONArray; … … 42 30 } 43 31 44 protected static BufferedInputStream getBufferedInputStream(String fileIn)45 throws IOException46 {47 BufferedInputStream bis = null;48 49 if (fileIn.startsWith("hdfs://")) {50 URI uri = URI.create (fileIn);51 Configuration conf = new Configuration();52 FileSystem file = FileSystem.get(uri, conf);53 FSDataInputStream fin = file.open(new Path(uri));54 55 bis = new BufferedInputStream(fin);56 }57 else {58 // Trim 'file://' off the front59 60 String local_file_in = fileIn;61 if (local_file_in.startsWith("file://")) {62 local_file_in = fileIn.substring("file://".length());63 }64 FileInputStream fin = new FileInputStream(local_file_in);65 bis = new BufferedInputStream(fin);66 }67 68 return bis;69 70 }71 protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)72 throws IOException, CompressorException73 {74 BufferedInputStream bis = getBufferedInputStream(fileIn);75 CompressorInputStream comp_input = new CompressorStreamFactory().createCompressorInputStream(bis);76 BufferedReader br = new BufferedReader(new InputStreamReader(comp_input,"UTF8"));77 return br;78 }79 80 32 protected JSONObject readJSONFile(String filename) 81 33 { … … 87 39 88 40 String str; 89 BufferedReader br = getBufferedReaderForCompressedFile(_input_dir + "/" + filename);41 BufferedReader br = ClusterFileIO.getBufferedReaderForCompressedFile(_input_dir + "/" + filename); 90 42 while ((str = br.readLine()) != null) { 91 43 sb.append(str); … … 120 72 } 121 73 122 public Iterator<String> call(String s)74 public Iterator<String> call(String json_file_in) 123 75 { 124 JSONObject extracted_feature_record = readJSONFile(s); 76 JSONObject extracted_feature_record = readJSONFile(json_file_in); 77 78 // Check output directory for volume exists, and create it if not 79 125 80 126 81 String id = extracted_feature_record.getString("id"); -
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java
r30934 r30937 19 19 private static final long serialVersionUID = 1L; 20 20 21 protected String _input_dir; 22 protected String _json_list_filename; 23 protected String _output_dir; 24 protected int _verbosity; 25 21 protected String _input_dir; 22 protected String _json_list_filename; 23 protected String _output_dir; 24 25 protected int _verbosity; 26 26 27 public PrepareForIngest(String input_dir, String json_list_filename, String output_dir, int verbosity) 27 28 { … … 34 35 public void exec() 35 36 { 36 S parkConf conf = new SparkConf().setAppName("HTRC-EF: Prepare for Solr Ingest");37 JavaSparkContext sc = new JavaSparkContext(conf);37 String spark_app_name = "HathiTrust Extract Features: Prepare for Solr Ingest"; 38 spark_app_name += "[" + _json_list_filename + "]"; 38 39 39 JavaRDD<String> json_list_data = sc.textFile(_json_list_filename).cache(); 40 SparkConf conf = new SparkConf().setAppName(spark_app_name); 41 JavaSparkContext jsc = new JavaSparkContext(conf); 42 ClusterFileIO.init(_input_dir); 43 44 // Check output directory exists, and create it if not 45 46 47 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename).cache(); 40 48 41 49 JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir)); 42 50 43 51 44 52 //long numAs = json_list_data.filter(new ContainsA()).count(); 45 53 46 47 /*54 55 /* 48 56 long numBs = json_list_data.filter(new Function<String, Boolean>() { 49 57 public Boolean call(String s) { return s.contains("b"); } … … 51 59 52 60 System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs); 53 */61 */ 54 62 long num_ids = json_ids.count(); 55 63 System.out.println(""); … … 58 66 System.out.println("############"); 59 67 System.out.println(""); 60 61 sc.close();68 69 jsc.close(); 62 70 } 63 71 … … 90 98 catch (ParseException e) { 91 99 System.err.println(e.getMessage()); 92 //System.err.println("Usage: RUN.bat [options] json-file-list.txt input-dir output-dir"); 93 formatter.printHelp("RUN.bash/RUN.bat [options] json-file-list.txt input-dir output-dir", options); 94 //System.err.println(" Where 'filename.txt' contains a list of JSON files, one per line,"); 95 //System.err.println(" which use the HathiTrust Extracted Feature JSON format"); 96 100 formatter.printHelp("RUN.bash [options] json-file-list.txt input-dir output-dir", options); 97 101 System.exit(1); 98 102 return; … … 110 114 111 115 116 String[] filtered_args = cmd.getArgs(); 112 117 113 String[] filtered_args = cmd.getArgs();114 115 118 if (filtered_args.length != 3) { 116 //System.err.println("Usage: RUN.bat [options] json-filelist.txt input-dir output-dir"); 117 formatter.printHelp("RUN.bash/RUN.bat [options] json-filelist.txt input-dir output-dir", options); 118 119 //System.err.println("Usage: RUN.bat [options] input-dir output-dir"); 120 //System.err.println(" Where 'filename.txt' contains a list of JSON files, one per line,"); 121 //System.err.println(" which use the HathiTrust Extracted Feature JSON format"); 122 System.exit(1); 119 formatter.printHelp("RUN.bash [options] json-filelist.txt input-dir output-dir", options); 120 System.exit(1); 123 121 } 124 122 String json_list_filename = filtered_args[0]; 125 123 String input_dir = filtered_args[1]; 126 124 String output_dir = filtered_args[2]; 127 125 128 126 129 127 //String json_list_filename = cmd.getArgs()[0]; // args[0];
Note:
See TracChangeset
for help on using the changeset viewer.