[30938] | 1 | package org.hathitrust;
|
---|
| 2 |
|
---|
| 3 | import java.io.BufferedInputStream;
|
---|
| 4 | import java.io.BufferedOutputStream;
|
---|
| 5 | import java.io.BufferedReader;
|
---|
| 6 | import java.io.BufferedWriter;
|
---|
| 7 | import java.io.FileInputStream;
|
---|
| 8 | import java.io.FileOutputStream;
|
---|
| 9 | import java.io.IOException;
|
---|
| 10 | import java.io.InputStreamReader;
|
---|
| 11 | import java.io.OutputStreamWriter;
|
---|
| 12 | import java.net.URI;
|
---|
| 13 | import java.net.URISyntaxException;
|
---|
| 14 |
|
---|
| 15 | import org.apache.commons.compress.compressors.CompressorException;
|
---|
| 16 | import org.apache.commons.compress.compressors.CompressorInputStream;
|
---|
| 17 | import org.apache.commons.compress.compressors.CompressorOutputStream;
|
---|
| 18 | import org.apache.commons.compress.compressors.CompressorStreamFactory;
|
---|
| 19 | import org.apache.hadoop.conf.Configuration;
|
---|
| 20 | import org.apache.hadoop.fs.FSDataInputStream;
|
---|
| 21 | import org.apache.hadoop.fs.FSDataOutputStream;
|
---|
| 22 | import org.apache.hadoop.fs.FileSystem;
|
---|
| 23 | import org.apache.hadoop.fs.Path;
|
---|
| 24 | import org.apache.spark.api.java.JavaSparkContext;
|
---|
| 25 |
|
---|
| 26 | public class ClusterFileIO {
|
---|
| 27 |
|
---|
| 28 | protected static FileSystem _fs = null;
|
---|
| 29 |
|
---|
| 30 | public static void initOLD(JavaSparkContext java_spark_context)
|
---|
| 31 | {
|
---|
| 32 | try {
|
---|
| 33 | Configuration hadoop_conf = java_spark_context.hadoopConfiguration();
|
---|
| 34 | _fs = FileSystem.get(hadoop_conf);
|
---|
| 35 | }
|
---|
| 36 | catch (IOException e) {
|
---|
| 37 | e.printStackTrace();
|
---|
| 38 | }
|
---|
| 39 | }
|
---|
| 40 |
|
---|
| 41 | public static void init(String input_dir)
|
---|
| 42 | {
|
---|
| 43 | try {
|
---|
| 44 | Configuration conf = new Configuration();
|
---|
| 45 | URI uri = new URI(input_dir);
|
---|
| 46 | _fs = FileSystem.get(uri, conf);
|
---|
| 47 | }
|
---|
| 48 | catch (URISyntaxException e) {
|
---|
| 49 | e.printStackTrace();
|
---|
| 50 | }
|
---|
| 51 | catch (IOException e) {
|
---|
| 52 | e.printStackTrace();
|
---|
| 53 | }
|
---|
| 54 |
|
---|
| 55 | }
|
---|
| 56 | public static boolean isHDFS(String fileIn)
|
---|
| 57 | {
|
---|
| 58 | return fileIn.startsWith("hdfs://");
|
---|
| 59 | }
|
---|
| 60 |
|
---|
| 61 | protected static boolean exists(String file)
|
---|
| 62 | {
|
---|
| 63 | //Configuration conf = jsc.hadoopConfiguration();
|
---|
| 64 | //FileSystem fs = org.apache.hadoop.fs.FileSystem.get(conf);
|
---|
| 65 | boolean exists = false;
|
---|
| 66 |
|
---|
| 67 | try {
|
---|
| 68 | exists = _fs.exists(new Path(file));
|
---|
| 69 | } catch (IllegalArgumentException e) {
|
---|
| 70 | exists = false;
|
---|
| 71 | } catch (IOException e) {
|
---|
| 72 | exists = false;
|
---|
| 73 | }
|
---|
| 74 |
|
---|
| 75 | return exists;
|
---|
| 76 | }
|
---|
| 77 | protected static BufferedInputStream getBufferedInputStream(String fileIn)
|
---|
| 78 | throws IOException
|
---|
| 79 | {
|
---|
| 80 | BufferedInputStream bis = null;
|
---|
| 81 |
|
---|
| 82 | if (isHDFS(fileIn)) {
|
---|
| 83 | //URI uri = URI.create (fileIn);
|
---|
| 84 | //Configuration conf = new Configuration();
|
---|
| 85 | //FileSystem file = FileSystem.get(uri, conf);
|
---|
| 86 | //FSDataInputStream fin = file.open(new Path(uri));
|
---|
| 87 |
|
---|
| 88 | FSDataInputStream fin = _fs.open(new Path(fileIn));
|
---|
| 89 |
|
---|
| 90 | bis = new BufferedInputStream(fin);
|
---|
| 91 | }
|
---|
| 92 | else {
|
---|
| 93 | /*
|
---|
| 94 | // Trim 'file://' off the front
|
---|
| 95 | String local_file_in = fileIn;
|
---|
| 96 | if (local_file_in.startsWith("file://")) {
|
---|
| 97 | local_file_in = fileIn.substring("file://".length());
|
---|
| 98 | }
|
---|
| 99 | FileInputStream fin = new FileInputStream(local_file_in);
|
---|
| 100 | bis = new BufferedInputStream(fin);
|
---|
| 101 | */
|
---|
| 102 |
|
---|
| 103 | FSDataInputStream fin = _fs.open(new Path(fileIn));
|
---|
| 104 |
|
---|
| 105 | bis = new BufferedInputStream(fin);
|
---|
| 106 | }
|
---|
| 107 |
|
---|
| 108 | return bis;
|
---|
| 109 | }
|
---|
| 110 |
|
---|
| 111 | protected static BufferedOutputStream getBufferedOutputStream(String fileOut)
|
---|
| 112 | throws IOException
|
---|
| 113 | {
|
---|
| 114 | BufferedOutputStream bos = null;
|
---|
| 115 |
|
---|
| 116 | if (fileOut.startsWith("hdfs://")) {
|
---|
| 117 | URI uri = URI.create (fileOut);
|
---|
| 118 | Configuration conf = new Configuration();
|
---|
| 119 | FileSystem file = FileSystem.get(uri, conf);
|
---|
| 120 | FSDataOutputStream fout = file.create(new Path(uri));
|
---|
| 121 |
|
---|
| 122 | bos = new BufferedOutputStream(fout);
|
---|
| 123 | }
|
---|
| 124 | else {
|
---|
| 125 | // Trim 'file://' off the front
|
---|
| 126 | String local_file_out = fileOut;
|
---|
| 127 | if (local_file_out.startsWith("file://")) {
|
---|
| 128 | local_file_out = fileOut.substring("file://".length());
|
---|
| 129 | }
|
---|
| 130 | FileOutputStream fout = new FileOutputStream(local_file_out);
|
---|
| 131 | bos = new BufferedOutputStream(fout);
|
---|
| 132 | }
|
---|
| 133 |
|
---|
| 134 | return bos;
|
---|
| 135 | }
|
---|
| 136 |
|
---|
| 137 | protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
|
---|
| 138 | throws IOException, CompressorException
|
---|
| 139 | {
|
---|
| 140 | BufferedInputStream bis = getBufferedInputStream(fileIn);
|
---|
| 141 | CompressorInputStream cis = new CompressorStreamFactory().createCompressorInputStream(bis);
|
---|
| 142 | BufferedReader br = new BufferedReader(new InputStreamReader(cis,"UTF8"));
|
---|
| 143 | return br;
|
---|
| 144 | }
|
---|
| 145 |
|
---|
| 146 | protected static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)
|
---|
| 147 | throws IOException, CompressorException
|
---|
| 148 | {
|
---|
| 149 | BufferedOutputStream bos = getBufferedOutputStream(fileOut);
|
---|
| 150 | CompressorStreamFactory csf = new CompressorStreamFactory();
|
---|
| 151 | CompressorOutputStream cos = csf.createCompressorOutputStream(CompressorStreamFactory.BZIP2,bos);
|
---|
| 152 | BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(cos,"UTF8"));
|
---|
| 153 | return bw;
|
---|
| 154 | }
|
---|
| 155 |
|
---|
| 156 | }
|
---|