source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/ClusterFileIO.java@ 30940

Last change on this file since 30940 was 30940, checked in by davidb, 8 years ago

Change to using URI not fileIn directly

  • Property svn:executable set to *
File size: 4.7 KB
Line 
1package org.hathitrust;
2
3import java.io.BufferedInputStream;
4import java.io.BufferedOutputStream;
5import java.io.BufferedReader;
6import java.io.BufferedWriter;
7import java.io.FileOutputStream;
8import java.io.IOException;
9import java.io.InputStreamReader;
10import java.io.OutputStreamWriter;
11import java.net.URI;
12import java.net.URISyntaxException;
13
14import org.apache.commons.compress.compressors.CompressorException;
15import org.apache.commons.compress.compressors.CompressorInputStream;
16import org.apache.commons.compress.compressors.CompressorOutputStream;
17import org.apache.commons.compress.compressors.CompressorStreamFactory;
18import org.apache.hadoop.conf.Configuration;
19import org.apache.hadoop.fs.FSDataInputStream;
20import org.apache.hadoop.fs.FSDataOutputStream;
21import org.apache.hadoop.fs.FileSystem;
22import org.apache.hadoop.fs.Path;
23import org.apache.spark.api.java.JavaSparkContext;
24
25public class ClusterFileIO {
26
27 protected static FileSystem _fs = null;
28
29 public static void initOLD(JavaSparkContext java_spark_context)
30 {
31 try {
32 Configuration hadoop_conf = java_spark_context.hadoopConfiguration();
33 _fs = FileSystem.get(hadoop_conf);
34 }
35 catch (IOException e) {
36 e.printStackTrace();
37 }
38 }
39
40 public static void init(String input_dir)
41 {
42 try {
43 Configuration conf = new Configuration();
44 URI uri = new URI(input_dir);
45 _fs = FileSystem.get(uri, conf);
46 }
47 catch (URISyntaxException e) {
48 e.printStackTrace();
49 }
50 catch (IOException e) {
51 e.printStackTrace();
52 }
53
54 }
55 public static boolean isHDFS(String fileIn)
56 {
57 return fileIn.startsWith("hdfs://");
58 }
59
60 protected static boolean exists(String file)
61 {
62 //Configuration conf = jsc.hadoopConfiguration();
63 //FileSystem fs = org.apache.hadoop.fs.FileSystem.get(conf);
64 boolean exists = false;
65
66 try {
67 exists = _fs.exists(new Path(file));
68 } catch (IllegalArgumentException e) {
69 exists = false;
70 } catch (IOException e) {
71 exists = false;
72 }
73
74 return exists;
75 }
76 protected static BufferedInputStream getBufferedInputStream(String fileIn)
77 throws IOException
78 {
79 BufferedInputStream bis = null;
80
81 if (isHDFS(fileIn)) {
82 URI uri = URI.create (fileIn);
83 //Configuration conf = new Configuration();
84 //FileSystem file = FileSystem.get(uri, conf);
85 //FSDataInputStream fin = file.open(new Path(uri));
86
87 //FSDataInputStream fin = _fs.open(new Path(fileIn));
88 FSDataInputStream fin = _fs.open(new Path(uri));
89
90 bis = new BufferedInputStream(fin);
91 }
92 else {
93 /*
94 // Trim 'file://' off the front
95 String local_file_in = fileIn;
96 if (local_file_in.startsWith("file://")) {
97 local_file_in = fileIn.substring("file://".length());
98 }
99 FileInputStream fin = new FileInputStream(local_file_in);
100 bis = new BufferedInputStream(fin);
101 */
102
103 FSDataInputStream fin = _fs.open(new Path(fileIn));
104
105 bis = new BufferedInputStream(fin);
106 }
107
108 return bis;
109 }
110
111 protected static BufferedOutputStream getBufferedOutputStream(String fileOut)
112 throws IOException
113 {
114 BufferedOutputStream bos = null;
115
116 if (fileOut.startsWith("hdfs://")) {
117 URI uri = URI.create (fileOut);
118 Configuration conf = new Configuration();
119 FileSystem file = FileSystem.get(uri, conf);
120 FSDataOutputStream fout = file.create(new Path(uri));
121
122 bos = new BufferedOutputStream(fout);
123 }
124 else {
125 // Trim 'file://' off the front
126 String local_file_out = fileOut;
127 if (local_file_out.startsWith("file://")) {
128 local_file_out = fileOut.substring("file://".length());
129 }
130 FileOutputStream fout = new FileOutputStream(local_file_out);
131 bos = new BufferedOutputStream(fout);
132 }
133
134 return bos;
135 }
136
137 protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
138 throws IOException, CompressorException
139 {
140 BufferedInputStream bis = getBufferedInputStream(fileIn);
141 CompressorInputStream cis = new CompressorStreamFactory().createCompressorInputStream(bis);
142 BufferedReader br = new BufferedReader(new InputStreamReader(cis,"UTF8"));
143 return br;
144 }
145
146 protected static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)
147 throws IOException, CompressorException
148 {
149 BufferedOutputStream bos = getBufferedOutputStream(fileOut);
150 CompressorStreamFactory csf = new CompressorStreamFactory();
151 CompressorOutputStream cos = csf.createCompressorOutputStream(CompressorStreamFactory.BZIP2,bos);
152 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(cos,"UTF8"));
153 return bw;
154 }
155
156}
Note: See TracBrowser for help on using the repository browser.