source: other-projects/hathitrust/solr-extracted-features/trunk/src/main/java/org/hathitrust/ClusterFileIO.java@ 30938

Last change on this file since 30938 was 30938, checked in by davidb, 8 years ago

Experiment with using Hadoop's FileSystem class for local file:// access

  • Property svn:executable set to *
File size: 4.7 KB
Line 
1package org.hathitrust;
2
3import java.io.BufferedInputStream;
4import java.io.BufferedOutputStream;
5import java.io.BufferedReader;
6import java.io.BufferedWriter;
7import java.io.FileInputStream;
8import java.io.FileOutputStream;
9import java.io.IOException;
10import java.io.InputStreamReader;
11import java.io.OutputStreamWriter;
12import java.net.URI;
13import java.net.URISyntaxException;
14
15import org.apache.commons.compress.compressors.CompressorException;
16import org.apache.commons.compress.compressors.CompressorInputStream;
17import org.apache.commons.compress.compressors.CompressorOutputStream;
18import org.apache.commons.compress.compressors.CompressorStreamFactory;
19import org.apache.hadoop.conf.Configuration;
20import org.apache.hadoop.fs.FSDataInputStream;
21import org.apache.hadoop.fs.FSDataOutputStream;
22import org.apache.hadoop.fs.FileSystem;
23import org.apache.hadoop.fs.Path;
24import org.apache.spark.api.java.JavaSparkContext;
25
26public class ClusterFileIO {
27
28 protected static FileSystem _fs = null;
29
30 public static void initOLD(JavaSparkContext java_spark_context)
31 {
32 try {
33 Configuration hadoop_conf = java_spark_context.hadoopConfiguration();
34 _fs = FileSystem.get(hadoop_conf);
35 }
36 catch (IOException e) {
37 e.printStackTrace();
38 }
39 }
40
41 public static void init(String input_dir)
42 {
43 try {
44 Configuration conf = new Configuration();
45 URI uri = new URI(input_dir);
46 _fs = FileSystem.get(uri, conf);
47 }
48 catch (URISyntaxException e) {
49 e.printStackTrace();
50 }
51 catch (IOException e) {
52 e.printStackTrace();
53 }
54
55 }
56 public static boolean isHDFS(String fileIn)
57 {
58 return fileIn.startsWith("hdfs://");
59 }
60
61 protected static boolean exists(String file)
62 {
63 //Configuration conf = jsc.hadoopConfiguration();
64 //FileSystem fs = org.apache.hadoop.fs.FileSystem.get(conf);
65 boolean exists = false;
66
67 try {
68 exists = _fs.exists(new Path(file));
69 } catch (IllegalArgumentException e) {
70 exists = false;
71 } catch (IOException e) {
72 exists = false;
73 }
74
75 return exists;
76 }
77 protected static BufferedInputStream getBufferedInputStream(String fileIn)
78 throws IOException
79 {
80 BufferedInputStream bis = null;
81
82 if (isHDFS(fileIn)) {
83 //URI uri = URI.create (fileIn);
84 //Configuration conf = new Configuration();
85 //FileSystem file = FileSystem.get(uri, conf);
86 //FSDataInputStream fin = file.open(new Path(uri));
87
88 FSDataInputStream fin = _fs.open(new Path(fileIn));
89
90 bis = new BufferedInputStream(fin);
91 }
92 else {
93 /*
94 // Trim 'file://' off the front
95 String local_file_in = fileIn;
96 if (local_file_in.startsWith("file://")) {
97 local_file_in = fileIn.substring("file://".length());
98 }
99 FileInputStream fin = new FileInputStream(local_file_in);
100 bis = new BufferedInputStream(fin);
101 */
102
103 FSDataInputStream fin = _fs.open(new Path(fileIn));
104
105 bis = new BufferedInputStream(fin);
106 }
107
108 return bis;
109 }
110
111 protected static BufferedOutputStream getBufferedOutputStream(String fileOut)
112 throws IOException
113 {
114 BufferedOutputStream bos = null;
115
116 if (fileOut.startsWith("hdfs://")) {
117 URI uri = URI.create (fileOut);
118 Configuration conf = new Configuration();
119 FileSystem file = FileSystem.get(uri, conf);
120 FSDataOutputStream fout = file.create(new Path(uri));
121
122 bos = new BufferedOutputStream(fout);
123 }
124 else {
125 // Trim 'file://' off the front
126 String local_file_out = fileOut;
127 if (local_file_out.startsWith("file://")) {
128 local_file_out = fileOut.substring("file://".length());
129 }
130 FileOutputStream fout = new FileOutputStream(local_file_out);
131 bos = new BufferedOutputStream(fout);
132 }
133
134 return bos;
135 }
136
137 protected static BufferedReader getBufferedReaderForCompressedFile(String fileIn)
138 throws IOException, CompressorException
139 {
140 BufferedInputStream bis = getBufferedInputStream(fileIn);
141 CompressorInputStream cis = new CompressorStreamFactory().createCompressorInputStream(bis);
142 BufferedReader br = new BufferedReader(new InputStreamReader(cis,"UTF8"));
143 return br;
144 }
145
146 protected static BufferedWriter getBufferedWriterForCompressedFile(String fileOut)
147 throws IOException, CompressorException
148 {
149 BufferedOutputStream bos = getBufferedOutputStream(fileOut);
150 CompressorStreamFactory csf = new CompressorStreamFactory();
151 CompressorOutputStream cos = csf.createCompressorOutputStream(CompressorStreamFactory.BZIP2,bos);
152 BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(cos,"UTF8"));
153 return bw;
154 }
155
156}
Note: See TracBrowser for help on using the repository browser.