package org.hathitrust; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.URI; import java.net.URISyntaxException; import org.apache.commons.compress.compressors.CompressorException; import org.apache.commons.compress.compressors.CompressorInputStream; import org.apache.commons.compress.compressors.CompressorOutputStream; import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaSparkContext; public class ClusterFileIO { protected static FileSystem _fs = null; public static void initOLD(JavaSparkContext java_spark_context) { try { Configuration hadoop_conf = java_spark_context.hadoopConfiguration(); _fs = FileSystem.get(hadoop_conf); } catch (IOException e) { e.printStackTrace(); } } public static void init(String input_dir) { try { Configuration conf = new Configuration(); URI uri = new URI(input_dir); _fs = FileSystem.get(uri, conf); } catch (URISyntaxException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public static boolean isHDFS(String fileIn) { return fileIn.startsWith("hdfs://"); } protected static boolean exists(String file) { //Configuration conf = jsc.hadoopConfiguration(); //FileSystem fs = org.apache.hadoop.fs.FileSystem.get(conf); boolean exists = false; try { exists = _fs.exists(new Path(file)); } catch (IllegalArgumentException e) { exists = false; } catch (IOException e) { exists = false; } return exists; } protected static BufferedInputStream getBufferedInputStream(String fileIn) throws IOException { BufferedInputStream bis = null; if (isHDFS(fileIn)) { //URI uri = URI.create (fileIn); //Configuration conf = new Configuration(); //FileSystem file = FileSystem.get(uri, conf); //FSDataInputStream fin = file.open(new Path(uri)); FSDataInputStream fin = _fs.open(new Path(fileIn)); bis = new BufferedInputStream(fin); } else { /* // Trim 'file://' off the front String local_file_in = fileIn; if (local_file_in.startsWith("file://")) { local_file_in = fileIn.substring("file://".length()); } FileInputStream fin = new FileInputStream(local_file_in); bis = new BufferedInputStream(fin); */ FSDataInputStream fin = _fs.open(new Path(fileIn)); bis = new BufferedInputStream(fin); } return bis; } protected static BufferedOutputStream getBufferedOutputStream(String fileOut) throws IOException { BufferedOutputStream bos = null; if (fileOut.startsWith("hdfs://")) { URI uri = URI.create (fileOut); Configuration conf = new Configuration(); FileSystem file = FileSystem.get(uri, conf); FSDataOutputStream fout = file.create(new Path(uri)); bos = new BufferedOutputStream(fout); } else { // Trim 'file://' off the front String local_file_out = fileOut; if (local_file_out.startsWith("file://")) { local_file_out = fileOut.substring("file://".length()); } FileOutputStream fout = new FileOutputStream(local_file_out); bos = new BufferedOutputStream(fout); } return bos; } protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn) throws IOException, CompressorException { BufferedInputStream bis = getBufferedInputStream(fileIn); CompressorInputStream cis = new CompressorStreamFactory().createCompressorInputStream(bis); BufferedReader br = new BufferedReader(new InputStreamReader(cis,"UTF8")); return br; } protected static BufferedWriter getBufferedWriterForCompressedFile(String fileOut) throws IOException, CompressorException { BufferedOutputStream bos = getBufferedOutputStream(fileOut); CompressorStreamFactory csf = new CompressorStreamFactory(); CompressorOutputStream cos = csf.createCompressorOutputStream(CompressorStreamFactory.BZIP2,bos); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(cos,"UTF8")); return bw; } }