Changeset 30945 for other-projects

Show
Ignore:
Timestamp:
26.10.2016 15:37:24 (3 years ago)
Author:
davidb
Message:

Getting closer to writing out JSON files

Location:
other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust
Files:
3 modified

Legend:

Unmodified
Added
Removed
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/ClusterFileIO.java

    r30941 r30945  
    2121import org.apache.hadoop.fs.FileSystem; 
    2222import org.apache.hadoop.fs.Path; 
    23 import org.apache.spark.api.java.JavaSparkContext; 
    2423 
    2524public class ClusterFileIO { 
     
    5857    */ 
    5958     
    60     public static FileSystem getFileSystemInstance(String input_dir) 
     59    protected static FileSystem getFileSystemInstance(String input_dir) 
    6160    { 
    6261        if (__fs == null) { 
     
    8281    } 
    8382     
    84     protected static boolean exists(String file)  
     83    public static boolean exists(String file)  
    8584    { 
    8685        FileSystem fs = getFileSystemInstance(file); 
     
    10099        return exists; 
    101100    } 
    102     protected static BufferedInputStream getBufferedInputStream(String fileIn)  
     101     
     102    public static String removeSuffix(String file,String suffix) 
     103    { 
     104        return file.substring(0,file.length() - suffix.length()); 
     105    } 
     106     
     107    public static boolean createDirectoryAll(String dir)  
     108    { 
     109        FileSystem fs = getFileSystemInstance(dir); 
     110        boolean created_dir = false; 
     111 
     112        if (!exists(dir)) { 
     113            try { 
     114                URI uri = new URI(dir); 
     115                Path path = new Path(uri); 
     116                fs.mkdirs(path); 
     117                created_dir = true;  
     118            } catch (URISyntaxException e) { 
     119                e.printStackTrace(); 
     120            } catch (IOException e) { 
     121                e.printStackTrace(); 
     122            } 
     123        } 
     124         
     125        return created_dir; 
     126    } 
     127     
     128    public static BufferedInputStream getBufferedInputStream(String fileIn)  
    103129            throws IOException  
    104130    { 
     
    137163    } 
    138164 
    139     protected static BufferedOutputStream getBufferedOutputStream(String fileOut)  
     165    public static BufferedOutputStream getBufferedOutputStream(String fileOut)  
    140166            throws IOException  
    141167    { 
     
    163189    } 
    164190     
    165     protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)  
     191    public static BufferedReader getBufferedReaderForCompressedFile(String fileIn)  
    166192            throws IOException, CompressorException  
    167193    { 
     
    172198    } 
    173199 
    174     protected static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)  
     200    public static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)  
    175201            throws IOException, CompressorException  
    176202    { 
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PagedJSON.java

    r30942 r30945  
    2424 
    2525    protected String _input_dir; 
     26    protected String _output_dir; 
    2627    protected int    _verbosity; 
    2728     
    28     public PagedJSON(String input_dir, int verbosity) 
     29    public PagedJSON(String input_dir, String output_dir, int verbosity) 
    2930    { 
    30         _input_dir = input_dir; 
    31         _verbosity = verbosity; 
     31        _input_dir  = input_dir; 
     32        _output_dir = output_dir; 
     33        _verbosity  = verbosity; 
    3234    } 
    3335     
     
    4446            while ((str = br.readLine()) != null) { 
    4547                sb.append(str); 
    46                 //System.out.println(str); 
    4748            } 
    4849 
    4950            br.close(); 
    50              
    51             //System.err.println("*****" + sb.toString()); 
    52                     
    53             /* 
    54             List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8); 
    55  
    56  
    57             for (String line : lines) { 
    58                 sb.append(line); 
    59  
    60             } 
    61             */ 
    62              
    6351        }  
    6452        catch (Exception e) { 
     
    6856        JSONObject json_obj = new JSONObject(sb.toString()); 
    6957         
    70          
    7158        return json_obj; 
    72          
    73         //return sb.toString(); 
    7459    } 
    7560 
     
    8368        String id = extracted_feature_record.getString("id"); 
    8469         
    85         JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); 
     70        //JSONObject ef_metadata = extracted_feature_record.getJSONObject("metadata"); 
    8671        JSONObject ef_features = extracted_feature_record.getJSONObject("features"); 
    8772         
     
    9782        int ef_num_pages = ef_pages.length(); 
    9883         
     84        // Make directory for page-level JSON output 
     85        String json_dir = ClusterFileIO.removeSuffix(json_file_in,".json.bz2"); 
     86        String page_json_dir = json_dir + "/pages"; 
     87        //ClusterFileIO.createDirectoryAll(_output_dir + "/" + page_json_dir); 
     88        System.out.println("mkdir: " + _output_dir + "/" + page_json_dir); 
     89         
    9990        ArrayList<String> ids = new ArrayList<String>(ef_num_pages); 
    10091        for (int i = 0; i < ef_page_count; i++) { 
    101             ids.add(id + "." + i); 
     92            String formatted_i = String.format("page-%06d", i); 
     93            String page_id = id + "." + formatted_i; 
     94         
     95            if (_verbosity >= 2) { 
     96              System.out.println("  Page: " + page_id); 
     97            } 
     98             
     99            // create JSON obj of just the page (for now) 
     100            // write it out 
     101             
     102            ids.add(page_json_dir +"/" + page_id + ".json.bz2"); 
     103             
     104            if (i==0) { 
     105                System.out.println("Sample output JSON page file: " + page_json_dir +"/" + page_id + ".json.bz2"); 
     106            } 
    102107        } 
    103108         
  • other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/PrepareForIngest.java

    r30944 r30945  
    11package org.hathitrust; 
    22 
    3 import java.io.IOException; 
    43import java.io.Serializable; 
    5 import java.nio.charset.StandardCharsets; 
    6 import java.nio.file.Files; 
    7 import java.nio.file.Path; 
    8 import java.nio.file.Paths; 
    9 import java.util.List; 
    10  
    114import org.apache.commons.cli.*; 
    125 
    136import org.apache.spark.api.java.*; 
    147import org.apache.spark.SparkConf; 
    15 import org.apache.spark.api.java.function.Function; 
    168 
    179public class PrepareForIngest implements Serializable 
     
    1911    private static final long serialVersionUID = 1L; 
    2012 
     13    public static final int NUM_PARTITIONS = 6; // default would appear to be 2 
     14     
    2115    protected String _input_dir; 
    2216    protected String _json_list_filename; 
     
    4034        SparkConf conf = new SparkConf().setAppName(spark_app_name); 
    4135        JavaSparkContext jsc = new JavaSparkContext(conf); 
    42         //ClusterFileIO.init(_input_dir); 
    4336         
    44         // Check output directory exists, and create it if not 
    45          
    46          
    47         if (_verbosity >= 1) { 
     37        if (_verbosity >= 2) { 
    4838            System.out.println("Default Minimum Partions: " + jsc.defaultMinPartitions()); 
    4939            System.out.println("Default Parallelism: " + jsc.defaultParallelism()); 
    5040        } 
    5141                 
    52         JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,6).cache(); 
     42        JavaRDD<String> json_list_data = jsc.textFile(_json_list_filename,NUM_PARTITIONS).cache(); 
    5343 
    54         JavaRDD<String> json_ids = json_list_data.flatMap(new PagedJSON(_input_dir,_verbosity)); 
     44        PagedJSON paged_json = new PagedJSON(_input_dir,_output_dir,_verbosity); 
     45        JavaRDD<String> json_ids = json_list_data.flatMap(paged_json).cache(); 
    5546 
     47        json_ids.saveAsTextFile("foo"); 
    5648 
    57         //long numAs = json_list_data.filter(new ContainsA()).count(); 
    58  
    59  
    60         /* 
    61         long numBs = json_list_data.filter(new Function<String, Boolean>() { 
    62             public Boolean call(String s) { return s.contains("b"); } 
    63         }).count(); 
    64  
    65         System.out.println("#### Lines with a: " + numAs + ", lines with b: " + numBs); 
    66          */ 
    6749        long num_ids = json_ids.count(); 
    6850        System.out.println(""); 
     
    8163 
    8264        //.withType(Integer.class) 
    83  
     65/* 
    8466        options.addOption(OptionBuilder.withLongOpt("verbosity") 
    8567                .withDescription("Set to control the level of debugging output [0=none, 1=some, 2=lots]") 
     
    8870                .isRequired(false) 
    8971                .create()); 
    90  
     72*/ 
    9173        //Option num_cores_opt = new Option("n", "num-cores", true, "Number of cores to use"); 
    9274        //num_cores_opt.setRequired(false); 
    9375        //options.addOption(num_cores_opt); 
    9476 
     77        Option verbosity_opt = new Option("v", "verbosity", true,  
     78                "Set to control the level of debugging output [0=none, 1=some, 2=lots]"); 
     79        verbosity_opt.setRequired(false); 
     80        options.addOption(verbosity_opt); 
     81         
    9582        //CommandLineParser parser = new DefaultParser(); // 1.3 and above 
    96         CommandLineParser parser = new GnuParser(); 
     83         
     84        // need to work with CLI v1.2 as this is the JAR that is bundled with Hadoop/Spark  
     85        CommandLineParser parser = new GnuParser();  
    9786        HelpFormatter formatter = new HelpFormatter(); 
    9887        CommandLine cmd; 
     
    112101 
    113102        //cmd.hasOption("json-filelist") 
     103         
    114104        String verbosity_str = cmd.getOptionValue("verbosity","0"); 
    115105        int verbosity = Integer.parseInt(verbosity_str); 
    116  
    117         //System.out.println(inputFilePath); 
    118         //System.out.println(outputFilePath); 
    119  
    120106 
    121107        String[] filtered_args = cmd.getArgs(); 
     
    129115        String output_dir = filtered_args[2]; 
    130116 
    131  
    132         //String json_list_filename = cmd.getArgs()[0]; // args[0]; 
    133         //String json_list_filename = args[0]; 
    134         //int num_cores = 2; 
    135  
    136117        PrepareForIngest prep_for_ingest = new PrepareForIngest(input_dir,json_list_filename,output_dir,verbosity); 
    137118        prep_for_ingest.exec();