package org.hathitrust; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.URI; import java.net.URISyntaxException; import org.apache.commons.compress.compressors.CompressorException; import org.apache.commons.compress.compressors.CompressorInputStream; import org.apache.commons.compress.compressors.CompressorOutputStream; import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class ClusterFileIO { private static FileSystem __fs = null; /* public static void initOLD(JavaSparkContext java_spark_context) { try { Configuration hadoop_conf = java_spark_context.hadoopConfiguration(); _fs = FileSystem.get(hadoop_conf); } catch (IOException e) { e.printStackTrace(); } } */ /* public static void init(String input_dir) { try { Configuration conf = new Configuration(); URI uri = new URI(input_dir); _fs = FileSystem.get(uri, conf); } catch (URISyntaxException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } */ protected static FileSystem getFileSystemInstance(String input_dir) { if (__fs == null) { try { Configuration conf = new Configuration(); URI uri = new URI(input_dir); __fs = FileSystem.get(uri, conf); } catch (URISyntaxException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } return __fs; } public static boolean isHDFS(String fileIn) { return fileIn.startsWith("hdfs://"); } public static boolean exists(String file) { FileSystem fs = getFileSystemInstance(file); //Configuration conf = jsc.hadoopConfiguration(); //FileSystem fs = org.apache.hadoop.fs.FileSystem.get(conf); boolean exists = false; try { exists = fs.exists(new Path(file)); } catch (IllegalArgumentException e) { exists = false; } catch (IOException e) { exists = false; } return exists; } public static String removeSuffix(String file,String suffix) { return file.substring(0,file.length() - suffix.length()); } public static boolean createDirectoryAll(String dir) { FileSystem fs = getFileSystemInstance(dir); boolean created_dir = false; if (!exists(dir)) { try { URI uri = new URI(dir); Path path = new Path(uri); fs.mkdirs(path); created_dir = true; } catch (URISyntaxException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } return created_dir; } public static BufferedInputStream getBufferedInputStream(String fileIn) throws IOException { FileSystem fs = getFileSystemInstance(fileIn); BufferedInputStream bis = null; if (isHDFS(fileIn)) { URI uri = URI.create (fileIn); //Configuration conf = new Configuration(); //FileSystem file = FileSystem.get(uri, conf); //FSDataInputStream fin = file.open(new Path(uri)); //FSDataInputStream fin = _fs.open(new Path(fileIn)); FSDataInputStream fin = fs.open(new Path(uri)); bis = new BufferedInputStream(fin); } else { /* // Trim 'file://' off the front String local_file_in = fileIn; if (local_file_in.startsWith("file://")) { local_file_in = fileIn.substring("file://".length()); } FileInputStream fin = new FileInputStream(local_file_in); bis = new BufferedInputStream(fin); */ FSDataInputStream fin = fs.open(new Path(fileIn)); bis = new BufferedInputStream(fin); } return bis; } public static BufferedOutputStream getBufferedOutputStream(String fileOut) throws IOException { BufferedOutputStream bos = null; if (fileOut.startsWith("hdfs://")) { URI uri = URI.create (fileOut); Configuration conf = new Configuration(); FileSystem file = FileSystem.get(uri, conf); FSDataOutputStream fout = file.create(new Path(uri)); bos = new BufferedOutputStream(fout); } else { // Trim 'file://' off the front String local_file_out = fileOut; if (local_file_out.startsWith("file://")) { local_file_out = fileOut.substring("file://".length()); } FileOutputStream fout = new FileOutputStream(local_file_out); bos = new BufferedOutputStream(fout); } return bos; } public static BufferedReader getBufferedReaderForCompressedFile(String fileIn) throws IOException, CompressorException { BufferedInputStream bis = getBufferedInputStream(fileIn); CompressorInputStream cis = new CompressorStreamFactory().createCompressorInputStream(bis); BufferedReader br = new BufferedReader(new InputStreamReader(cis,"UTF8")); return br; } public static BufferedWriter getBufferedWriterForCompressedFile(String fileOut) throws IOException, CompressorException { BufferedOutputStream bos = getBufferedOutputStream(fileOut); CompressorStreamFactory csf = new CompressorStreamFactory(); CompressorOutputStream cos = csf.createCompressorOutputStream(CompressorStreamFactory.BZIP2,bos); BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(cos,"UTF8")); return bw; } }