Changeset 30945
- Timestamp:
- 2016-10-26T15:37:24+13:00 (7 years ago)
- Location:
- other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/ClusterFileIO.java
r30941 r30945 21 21 import org.apache.hadoop.fs.FileSystem; 22 22 import org.apache.hadoop.fs.Path; 23 import org.apache.spark.api.java.JavaSparkContext;24 23 25 24 public class ClusterFileIO { … … 58 57 */ 59 58 60 p ublicstatic FileSystem getFileSystemInstance(String input_dir)59 protected static FileSystem getFileSystemInstance(String input_dir) 61 60 { 62 61 if (__fs == null) { … … 82 81 } 83 82 84 p rotectedstatic boolean exists(String file)83 public static boolean exists(String file) 85 84 { 86 85 FileSystem fs = getFileSystemInstance(file); … … 100 99 return exists; 101 100 } 102 protected static BufferedInputStream getBufferedInputStream(String fileIn) 101 102 public static String removeSuffix(String file,String suffix) 103 { 104 return file.substring(0,file.length() - suffix.length()); 105 } 106 107 public static boolean createDirectoryAll(String dir) 108 { 109 FileSystem fs = getFileSystemInstance(dir); 110 boolean created_dir = false; 111 112 if (!exists(dir)) { 113 try { 114 URI uri = new URI(dir); 115 Path path = new Path(uri); 116 fs.mkdirs(path); 117 created_dir = true; 118 } catch (URISyntaxException e) { 119 e.printStackTrace(); 120 } catch (IOException e) { 121 e.printStackTrace(); 122 } 123 } 124 125 return created_dir; 126 } 127 128 public static BufferedInputStream getBufferedInputStream(String fileIn) 103 129 throws IOException 104 130 { … … 137 163 } 138 164 139 p rotectedstatic BufferedOutputStream getBufferedOutputStream(String fileOut)165 public static BufferedOutputStream getBufferedOutputStream(String fileOut) 140 166 throws IOException 141 167 { … … 163 189 } 164 190 165 p rotectedstatic BufferedReader getBufferedReaderForCompressedFile(String fileIn)191 public static BufferedReader getBufferedReaderForCompressedFile(String fileIn) 166 192 throws IOException, CompressorException 167 193 { … … 172 198 } 173 199 174 p rotectedstatic BufferedWriter getBufferedWriterForCompressedFile(String fileOut)200 public static BufferedWriter getBufferedWriterForCompressedFile(String fileOut) 175 201 throws IOException, CompressorException 176 202 { -
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java
r30942 r30945 24 24 25 25 protected String _input_dir; 26 protected String _output_dir; 26 27 protected int _verbosity; 27 28 28 public PagedJSON(String input_dir, int verbosity)29 public PagedJSON(String input_dir, String output_dir, int verbosity) 29 30 { 30 _input_dir = input_dir; 31 _verbosity = verbosity; 31 _input_dir = input_dir; 32 _output_dir = output_dir; 33 _verbosity = verbosity; 32 34 } 33 35 … … 44 46 while ((str = br.readLine()) != null) { 45 47 sb.append(str); 46 //System.out.println(str);47 48 } 48 49 49 50 br.close(); 50 51 //System.err.println("*****" + sb.toString());52 53 /*54 List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8);55 56 57 for (String line : lines) {58 sb.append(line);59 60 }61 */62 63 51 } 64 52 catch (Exception e) { … … 68 56 JSONObject json_obj = new JSONObject(sb.toString()); 69 57 70 71 58 return json_obj; 72 73 //return sb.toString();74 59 } 75 60 … … 83 68 String id = extracted_feature_record.getString("id"); 84 69 85 JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata");70 //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); 86 71 JSONObject ef_features = extracted_feature_record.getJSONObject("features"); 87 72 … … 97 82 int ef_num_pages = ef_pages.length(); 98 83 84 // Make directory for page-level JSON output 85 String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2"); 86 String page_json_dir = json_dir + "/pages"; 87 //ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 88 System.out.println("mkdir: " + _output_dir + "/" + page_json_dir); 89 99 90 ArrayList<String> ids = new ArrayList<String>(ef_num_pages); 100 91 for (int i = 0; i < ef_page_count; i++) { 101 ids.add(id + "." + i); 92 String formatted_i = String.format("page-%06d", i); 93 String page_id = id + "." + formatted_i; 94 95 if (_verbosity >= 2) { 96 System.out.println(" Page: " + page_id); 97 } 98 99 // create JSON obj of just the page (for now) 100 // write it out 101 102 ids.add(page_json_dir +"/" + page_id + ".json.bz2"); 103 104 if (i==0) { 105 System.out.println("Sample output JSON page file: " + page_json_dir +"/" + page_id + ".json.bz2"); 106 } 102 107 } 103 108 -
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java
r30944 r30945 1 1 package org.hathitrust; 2 2 3 import java.io.IOException;4 3 import java.io.Serializable; 5 import java.nio.charset.StandardCharsets;6 import java.nio.file.Files;7 import java.nio.file.Path;8 import java.nio.file.Paths;9 import java.util.List;10 11 4 import org.apache.commons.cli.*; 12 5 13 6 import org.apache.spark.api.java.*; 14 7 import org.apache.spark.SparkConf; 15 import org.apache.spark.api.java.function.Function;16 8 17 9 public class PrepareForIngest implements Serializable … … 19 11 private static final long serialVersionUID = 1L; 20 12 13 public static final int NUM_PARTITIONS = 6; // default would appear to be 2 14 21 15 protected String _input_dir; 22 16 protected String _json_list_filename; … … 40 34 SparkConf conf = new SparkConf().setAppName(spark_app_name); 41 35 JavaSparkContext jsc = new JavaSparkContext(conf); 42 //ClusterFileIO.init(_input_dir);43 36 44 // Check output directory exists, and create it if not 45 46 47 if (_verbosity >= 1) { 37 if (_verbosity >= 2) { 48 38 System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 49 39 System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 50 40 } 51 41 52 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename, 6).cache();42 JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 53 43 54 JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir,_verbosity)); 44 PagedJSON paged_json = new PagedJSON(_input_dir,_output_dir,_verbosity); 45 JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 55 46 47 json_ids.saveAsTextFile("foo"); 56 48 57 //long numAs = json_list_data.filter(new ContainsA()).count();58 59 60 /*61 long numBs = json_list_data.filter(new Function<String, Boolean>() {62 public Boolean call(String s) { return s.contains("b"); }63 }).count();64 65 System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs);66 */67 49 long num_ids = json_ids.count(); 68 50 System.out.println(""); … … 81 63 82 64 //.withType(Integer.class) 83 65 /* 84 66 options.addOption(OptionBuilder.withLongOpt("verbosity") 85 67 .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]") … … 88 70 .isRequired(false) 89 71 .create()); 90 72 */ 91 73 //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use"); 92 74 //num_cores_opt.setRequired(false); 93 75 //options.addOption(num_cores_opt); 94 76 77 Option verbosity_opt = new Option("v", "verbosity", true, 78 "Set to control the level of debugging output [0=none, 1=some, 2=lots]"); 79 verbosity_opt.setRequired(false); 80 options.addOption(verbosity_opt); 81 95 82 //CommandLineParser parser = new DefaultParser(); // 1.3 and above 96 CommandLineParser parser = new GnuParser(); 83 84 // need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark 85 CommandLineParser parser = new GnuParser(); 97 86 HelpFormatter formatter = new HelpFormatter(); 98 87 CommandLine cmd; … … 112 101 113 102 //cmd.hasOption("json-filelist") 103 114 104 String verbosity_str = cmd.getOptionValue("verbosity","0"); 115 105 int verbosity = Integer.parseInt(verbosity_str); 116 117 //System.out.println(inputFilePath);118 //System.out.println(outputFilePath);119 120 106 121 107 String[] filtered_args = cmd.getArgs(); … … 129 115 String output_dir = filtered_args[2]; 130 116 131 132 //String json_list_filename = cmd.getArgs()[0]; // args[0];133 //String json_list_filename = args[0];134 //int num_cores = 2;135 136 117 PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,output_dir,verbosity); 137 118 prep_for_ingest.exec();
Note:
See TracChangeset
for help on using the changeset viewer.