source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/ClusterFileIO.java@ 30942

Last change on this file since 30942 was 30941, checked in by davidb, 7 years ago

Moved to getFileSystemInstance() method to play nice on cluster

  • Property svn:executable set to *
File size: 5.2 KB
Line 
1package org.hathitrust;
2
3import java.io.BufferedInputStream;
4import java.io.BufferedOutputStream;
5import java.io.BufferedReader;
6import java.io.BufferedWriter;
7import java.io.FileOutputStream;
8import java.io.IOException;
9import java.io.InputStreamReader;
10import java.io.OutputStreamWriter;
11import java.net.URI;
12import java.net.URISyntaxException;
13
14import org.apache.commons.compress.compressors.CompressorException;
15import org.apache.commons.compress.compressors.CompressorInputStream;
16import org.apache.commons.compress.compressors.CompressorOutputStream;
17import org.apache.commons.compress.compressors.CompressorStreamFactory;
18import org.apache.hadoop.conf.Configuration;
19import org.apache.hadoop.fs.FSDataInputStream;
20import org.apache.hadoop.fs.FSDataOutputStream;
21import org.apache.hadoop.fs.FileSystem;
22import org.apache.hadoop.fs.Path;
23import org.apache.spark.api.java.JavaSparkContext;
24
25public class ClusterFileIO {
26
27 private static FileSystem __fs = null;
28
29 /*
30 public static void initOLD(JavaSparkContext java_spark_context)
31 {
32 try {
33 Configuration hadoop_conf = java_spark_context.hadoopConfiguration();
34 _fs = FileSystem.get(hadoop_conf);
35 }
36 catch (IOException e) {
37 e.printStackTrace();
38 }
39 }
40 */
41
42 /*
43 public static void init(String input_dir)
44 {
45 try {
46 Configuration conf = new Configuration();
47 URI uri = new URI(input_dir);
48 _fs = FileSystem.get(uri, conf);
49 }
50 catch (URISyntaxException e) {
51 e.printStackTrace();
52 }
53 catch (IOException e) {
54 e.printStackTrace();
55 }
56
57 }
58 */
59
60 public static FileSystem getFileSystemInstance(String input_dir)
61 {
62 if (__fs == null) {
63
64 try {
65 Configuration conf = new Configuration();
66 URI uri = new URI(input_dir);
67 __fs = FileSystem.get(uri, conf);
68 }
69 catch (URISyntaxException e) {
70 e.printStackTrace();
71 }
72 catch (IOException e) {
73 e.printStackTrace();
74 }
75 }
76 return __fs;
77 }
78
79 public static boolean isHDFS(String fileIn)
80 {
81 return fileIn.startsWith("hdfs://");
82 }
83
84 protected static boolean exists(String file)
85 {
86 FileSystem fs = getFileSystemInstance(file);
87
88 //Configuration conf = jsc.hadoopConfiguration();
89 //FileSystem fs = org.apache.hadoop.fs.FileSystem.get(conf);
90 boolean exists = false;
91
92 try {
93 exists = fs.exists(new Path(file));
94 } catch (IllegalArgumentException e) {
95 exists = false;
96 } catch (IOException e) {
97 exists = false;
98 }
99
100 return exists;
101 }
102 protected static BufferedInputStream getBufferedInputStream(String fileIn)
103 throws IOException
104 {
105 FileSystem fs = getFileSystemInstance(fileIn);
106
107 BufferedInputStream bis = null;
108
109 if (isHDFS(fileIn)) {
110 URI uri = URI.create (fileIn);
111 //Configuration conf = new Configuration();
112 //FileSystem file = FileSystem.get(uri, conf);
113 //FSDataInputStream fin = file.open(new Path(uri));
114
115 //FSDataInputStream fin = _fs.open(new Path(fileIn));
116 FSDataInputStream fin = fs.open(new Path(uri));
117
118 bis = new BufferedInputStream(fin);
119 }
120 else {
121 /*
122 // Trim 'file://' off the front
123 String local_file_in = fileIn;
124 if (local_file_in.startsWith("file://")) {
125 local_file_in = fileIn.substring("file://".length());
126 }
127 FileInputStream fin = new FileInputStream(local_file_in);
128 bis = new BufferedInputStream(fin);
129 */
130
131 FSDataInputStream fin = fs.open(new Path(fileIn));
132
133 bis = new BufferedInputStream(fin);
134 }
135
136 return bis;
137 }
138
139 protected static BufferedOutputStream getBufferedOutputStream(String fileOut)
140 throws IOException
141 {
142 BufferedOutputStream bos = null;
143
144 if (fileOut.startsWith("hdfs://")) {
145 URI uri = URI.create (fileOut);
146 Configuration conf = new Configuration();
147 FileSystem file = FileSystem.get(uri, conf);
148 FSDataOutputStream fout = file.create(new Path(uri));
149
150 bos = new BufferedOutputStream(fout);
151 }
152 else {
153 // Trim 'file://' off the front
154 String local_file_out = fileOut;
155 if (local_file_out.startsWith("file://")) {
156 local_file_out = fileOut.substring("file://".length());
157 }
158 FileOutputStream fout = new FileOutputStream(local_file_out);
159 bos = new BufferedOutputStream(fout);
160 }
161
162 return bos;
163 }
164
165 protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
166 throws IOException, CompressorException
167 {
168 BufferedInputStream bis = getBufferedInputStream(fileIn);
169 CompressorInputStream cis = new CompressorStreamFactory().createCompressorInputStream(bis);
170 BufferedReader br = new BufferedReader(new InputStreamReader(cis,"UTF8"));
171 return br;
172 }
173
174 protected static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)
175 throws IOException, CompressorException
176 {
177 BufferedOutputStream bos = getBufferedOutputStream(fileOut);
178 CompressorStreamFactory csf = new CompressorStreamFactory();
179 CompressorOutputStream cos = csf.createCompressorOutputStream(CompressorStreamFactory.BZIP2,bos);
180 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(cos,"UTF8"));
181 return bw;
182 }
183
184}
Note: See TracBrowser for help on using the repository browser.