source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/ClusterFileIO.java@ 30945

Last change on this file since 30945 was 30945, checked in by davidb, 7 years ago

Getting closer to writing out JSON files

  • Property svn:executable set to *
File size: 5.7 KB
Line 
1package org.hathitrust;
2
3import java.io.BufferedInputStream;
4import java.io.BufferedOutputStream;
5import java.io.BufferedReader;
6import java.io.BufferedWriter;
7import java.io.FileOutputStream;
8import java.io.IOException;
9import java.io.InputStreamReader;
10import java.io.OutputStreamWriter;
11import java.net.URI;
12import java.net.URISyntaxException;
13
14import org.apache.commons.compress.compressors.CompressorException;
15import org.apache.commons.compress.compressors.CompressorInputStream;
16import org.apache.commons.compress.compressors.CompressorOutputStream;
17import org.apache.commons.compress.compressors.CompressorStreamFactory;
18import org.apache.hadoop.conf.Configuration;
19import org.apache.hadoop.fs.FSDataInputStream;
20import org.apache.hadoop.fs.FSDataOutputStream;
21import org.apache.hadoop.fs.FileSystem;
22import org.apache.hadoop.fs.Path;
23
24public class ClusterFileIO {
25
26 private static FileSystem __fs = null;
27
28 /*
29 public static void initOLD(JavaSparkContext java_spark_context)
30 {
31 try {
32 Configuration hadoop_conf = java_spark_context.hadoopConfiguration();
33 _fs = FileSystem.get(hadoop_conf);
34 }
35 catch (IOException e) {
36 e.printStackTrace();
37 }
38 }
39 */
40
41 /*
42 public static void init(String input_dir)
43 {
44 try {
45 Configuration conf = new Configuration();
46 URI uri = new URI(input_dir);
47 _fs = FileSystem.get(uri, conf);
48 }
49 catch (URISyntaxException e) {
50 e.printStackTrace();
51 }
52 catch (IOException e) {
53 e.printStackTrace();
54 }
55
56 }
57 */
58
59 protected static FileSystem getFileSystemInstance(String input_dir)
60 {
61 if (__fs == null) {
62
63 try {
64 Configuration conf = new Configuration();
65 URI uri = new URI(input_dir);
66 __fs = FileSystem.get(uri, conf);
67 }
68 catch (URISyntaxException e) {
69 e.printStackTrace();
70 }
71 catch (IOException e) {
72 e.printStackTrace();
73 }
74 }
75 return __fs;
76 }
77
78 public static boolean isHDFS(String fileIn)
79 {
80 return fileIn.startsWith("hdfs://");
81 }
82
83 public static boolean exists(String file)
84 {
85 FileSystem fs = getFileSystemInstance(file);
86
87 //Configuration conf = jsc.hadoopConfiguration();
88 //FileSystem fs = org.apache.hadoop.fs.FileSystem.get(conf);
89 boolean exists = false;
90
91 try {
92 exists = fs.exists(new Path(file));
93 } catch (IllegalArgumentException e) {
94 exists = false;
95 } catch (IOException e) {
96 exists = false;
97 }
98
99 return exists;
100 }
101
102 public static String removeSuffix(String file,String suffix)
103 {
104 return file.substring(0,file.length() - suffix.length());
105 }
106
107 public static boolean createDirectoryAll(String dir)
108 {
109 FileSystem fs = getFileSystemInstance(dir);
110 boolean created_dir = false;
111
112 if (!exists(dir)) {
113 try {
114 URI uri = new URI(dir);
115 Path path = new Path(uri);
116 fs.mkdirs(path);
117 created_dir = true;
118 } catch (URISyntaxException e) {
119 e.printStackTrace();
120 } catch (IOException e) {
121 e.printStackTrace();
122 }
123 }
124
125 return created_dir;
126 }
127
128 public static BufferedInputStream getBufferedInputStream(String fileIn)
129 throws IOException
130 {
131 FileSystem fs = getFileSystemInstance(fileIn);
132
133 BufferedInputStream bis = null;
134
135 if (isHDFS(fileIn)) {
136 URI uri = URI.create (fileIn);
137 //Configuration conf = new Configuration();
138 //FileSystem file = FileSystem.get(uri, conf);
139 //FSDataInputStream fin = file.open(new Path(uri));
140
141 //FSDataInputStream fin = _fs.open(new Path(fileIn));
142 FSDataInputStream fin = fs.open(new Path(uri));
143
144 bis = new BufferedInputStream(fin);
145 }
146 else {
147 /*
148 // Trim 'file://' off the front
149 String local_file_in = fileIn;
150 if (local_file_in.startsWith("file://")) {
151 local_file_in = fileIn.substring("file://".length());
152 }
153 FileInputStream fin = new FileInputStream(local_file_in);
154 bis = new BufferedInputStream(fin);
155 */
156
157 FSDataInputStream fin = fs.open(new Path(fileIn));
158
159 bis = new BufferedInputStream(fin);
160 }
161
162 return bis;
163 }
164
165 public static BufferedOutputStream getBufferedOutputStream(String fileOut)
166 throws IOException
167 {
168 BufferedOutputStream bos = null;
169
170 if (fileOut.startsWith("hdfs://")) {
171 URI uri = URI.create (fileOut);
172 Configuration conf = new Configuration();
173 FileSystem file = FileSystem.get(uri, conf);
174 FSDataOutputStream fout = file.create(new Path(uri));
175
176 bos = new BufferedOutputStream(fout);
177 }
178 else {
179 // Trim 'file://' off the front
180 String local_file_out = fileOut;
181 if (local_file_out.startsWith("file://")) {
182 local_file_out = fileOut.substring("file://".length());
183 }
184 FileOutputStream fout = new FileOutputStream(local_file_out);
185 bos = new BufferedOutputStream(fout);
186 }
187
188 return bos;
189 }
190
191 public static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
192 throws IOException, CompressorException
193 {
194 BufferedInputStream bis = getBufferedInputStream(fileIn);
195 CompressorInputStream cis = new CompressorStreamFactory().createCompressorInputStream(bis);
196 BufferedReader br = new BufferedReader(new InputStreamReader(cis,"UTF8"));
197 return br;
198 }
199
200 public static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)
201 throws IOException, CompressorException
202 {
203 BufferedOutputStream bos = getBufferedOutputStream(fileOut);
204 CompressorStreamFactory csf = new CompressorStreamFactory();
205 CompressorOutputStream cos = csf.createCompressorOutputStream(CompressorStreamFactory.BZIP2,bos);
206 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(cos,"UTF8"));
207 return bw;
208 }
209
210}
Note: See TracBrowser for help on using the repository browser.