source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS/ThriftFH.pm@ 27384

Last change on this file since 27384 was 27384, checked in by jmt12, 11 years ago

An overriding version of FileUtils that provides several drivers required for parallel processing. In the future we may want to split this off as its own extension and add further drivers for HTTP, FTP, WebDAV etc

File size: 8.1 KB
Line 
1###########################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software
6# from the New Zealand Digital Library Project at the
7# University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24#
25###########################################################################
26
27################################################################################
28## An Object wrapped around a HDThrift file handle usable as a Perl Handle ##
29################################################################################
30
31package FileUtils::HDThriftFS::ThriftFH;
32
33require Tie::Handle;
34
35use base 'Tie::Handle';
36use Devel::Peek;
37use MIME::Base91;
38
39my $debug_encoding = 0;
40
41sub TIEHANDLE
42{
43 my $class = shift;
44 my $thrift_client = shift;
45 my $self = {};
46 $self->{'buffer_length'} = 4096; # 4k blocks
47 $self->{'client'} = $thrift_client;
48 $self->{'fh'} = 0;
49 $self->{'file_length'} = 0;
50 $self->{'mode'} = 'r';
51 $self->{'read_offset'} = 0; # A read offset
52 return bless $self, $class;
53}
54
55sub WRITE
56{
57 my $self = shift;
58 my ($scalar, $length, $offset) = @_;
59 print STDERR "ThriftFH::WRITE() - implement me!\n";
60}
61
62sub PRINT
63{
64 my $self = shift;
65 my $result = 0;
66 # only available in write or append modes
67 if ($self->{'mode'} eq 'w' || $self->{'mode'} eq 'a')
68 {
69 foreach my $decoded_buffer (@_)
70 {
71 if ($debug_encoding)
72 {
73 print STDERR "Print String: \n=== START ===\n"; Dump($decoded_buffer); print STDERR "\n=== END ===\n\n";
74 }
75 # We now need to Base91 encode everything sent through to Thrift Server
76 my $encoded_buffer = MIME::Base91::encode($decoded_buffer);
77 if ($debug_encoding)
78 {
79 print STDERR "Encoded String: \n=== START ===\n"; Dump($encoded_buffer); print STDERR "\n=== END ===\n\n";
80 }
81 $self->{'client'}->write($self->{'fh'}, $encoded_buffer);
82 }
83 $result = 1;
84 }
85 return $result;
86}
87
88sub PRINTF
89{
90 my $self = shift;
91 my $format = shift;
92 print STDERR "ThriftFH::PRINTF() - implement me!\n";
93}
94
95sub READ
96{
97 my $self = shift;
98 my ($scalar_ref, $length, $offset) = @_;
99 my $bytes_read = 0;
100 if (!$self->{'eof'} && $self->{'mode'} eq 'r')
101 {
102 my $encoded_buffer = $self->{'client'}->read($self->{'fh'}, $offset, $length);
103 if ($debug_encoding)
104 {
105 print STDERR "Read String: \n=== START ===\n"; Dump($encoded_buffer); print STDERR "\n=== END ===\n\n";
106 }
107 my $decoded_buffer = MIME::Base91::decode($encoded_buffer);
108 if ($debug_encoding)
109 {
110 print STDERR "Decoded String: \n=== START ===\n"; Dump($decoded_buffer); print STDERR "\n=== END ===\n\n";
111 }
112 $bytes_read = scalar($decoded_buffer);
113 # the only way I can see to modify the callers version of $scalar
114 $_[0] = $decoded_buffer;
115 }
116 return $bytes_read;
117}
118
119sub READLINE
120{
121 my $self = shift;
122 #rint "ThriftFH:READLINE()\n";
123 #rint " - file_length=" . $self->{'file_length'} . "\n";
124 my $line = undef;
125 # out of file? out of lines!
126 # not in read mode - can't read!
127 if (!$self->{'eof'} && $self->{'mode'} eq 'r')
128 {
129 my $found_line = 0;
130 my $search_offset = $self->{'read_offset'};
131 my $search_eof = $self->{'eof'};
132 # grab a 'buffer' of data starting at my current read offset
133 #rint "->read(fh, $search_offset, " . $self->{'buffer_length'} . ")\n";
134 my $encoded_buffer = $self->{'client'}->read($self->{'fh'}, $search_offset, $self->{'buffer_length'});
135 my $decoded_buffer = MIME::Base91::decode($encoded_buffer);
136 #rint " - buffer=|" . $decoded_buffer . "|\n";
137 if (length($decoded_buffer) < $self->{'buffer_length'} || ($self->{'read_offset'} + length($decoded_buffer)) < $self->{'file_length'})
138 {
139 $search_eof = 1;
140 }
141 while (!$search_eof && index($decoded_buffer, "\n") == -1)
142 {
143 #rint " * no newline found yet - filling buffer...\n";
144 $search_offset = $self->{'read_offset'} + length($decoded_buffer);
145 #rint "->read(fh, $search_offset, " . $self->{'buffer_length'} . ")\n";
146 my $more_encoded_buffer = $self->{'client'}->read($self->{'fh'}, $search_offset, $self->{'buffer_length'});
147 my $more_decoded_buffer = MIME::Base91::decode($more_encoded_buffer);
148 #rint " - more_buffer=|" . $more_decoded_buffer . "|\n";
149 $decoded_buffer .= $more_decoded_buffer;
150 #rint " - buffer=|" . $decoded_buffer. "|\n";
151 # if I read less than I asked for, or my next search offset is beyond the
152 # end of the file, then I've run out of 'file'
153 if (length($more_decoded_buffer) < $self->{'buffer_length'} || ($self->{'read_offset'} + length($decoded_buffer)) < $self->{'file_length'})
154 {
155 $search_eof = 1;
156 #rint " * found eof!\n";
157 }
158 }
159 # if buffer contains newline, then we only return a fragment of buffer and
160 # update the offset
161 my $newline_offset = index($decoded_buffer, "\n");
162 if ($newline_offset > -1)
163 {
164 #rint " * found newline - returning part of buffer\n";
165 $line = substr($decoded_buffer, 0, $newline_offset + 1);
166 # rest of buffer ignored
167 }
168 # otherwise we return all of buffer and mark the file as eof
169 else
170 {
171 #rint " * no newline found - returning all of buffer\n";
172 $line = $decoded_buffer;
173 $self->{'eof'} = 1; # out of file
174 }
175 # move the read pointer by however much we read (in either case)
176 $self->{'read_offset'} += length($line);
177 #rint " => |" . $line . "|\n\n";
178 }
179 return $line;
180}
181
182sub GETC
183{
184 my $self = shift;
185 print STDERR "ThriftFH::GETC() - implement me!\n";
186 return '';
187}
188
189sub CLOSE
190{
191 my $self = shift;
192 $self->{'client'}->close($self->{'fh'});
193 $self->{'fh'} = 0;
194 return 1;
195}
196
197
198## @function open()
199#
200sub OPEN
201{
202 my $self = shift;
203 my $path = shift;
204 my $mode = shift;
205 if (!defined $mode)
206 {
207 $mode = '<'; # Default to read as that's least destructive
208 }
209 #rint STDERR "DEBUG: ThriftFH::OPEN(self, $path, $mode)\n";
210 # write mode always creates a new file - clobbering any existing
211 if ($mode eq 'w' || $mode eq '>')
212 {
213 # if file already exists, create() will (eventually) fail - so we need to
214 # ensure it doesn't already exist beforehand
215 if ($self->{'client'}->exists($path))
216 {
217 $self->{'client'}->rm ($path);
218 }
219 $self->{'fh'} = $self->{'client'}->create($path);
220 $self->{'mode'} = 'w'; # writing
221 }
222 else
223 {
224 # open will fail if the file doesn't already exist - so we may
225 # need to create it beforehand
226 if (!$self->{'client'}->exists($path))
227 {
228 $self->{'fh'} = $self->{'client'}->create($path);
229 $self->{'file_length'} = 0;
230 }
231 else
232 {
233 $self->{'fh'} = $self->{'client'}->open($path);
234 # for reading we need to know the file size so as to avoid Thrift throwing
235 # exceptions when we try to read past the end
236 my $file_stat = $self->{'client'}->stat($path);
237 $self->{'file_length'} = $file_stat->{length};
238 }
239 $self->{'mode'} = 'r'; # reading
240 }
241 return $self;
242}
243## open()
244
245
246sub BINMODE
247{
248 print STDERR "ThriftFH::BINMODE() - implement me!\n";
249}
250
251sub EOF
252{
253 my $self = shift;
254 return $self->{'eof'};
255}
256
257sub TELL
258{
259 print STDERR "ThriftFH::TELL() - implement me!\n";
260}
261
262sub SEEK
263{
264 my $self = shift;
265 my ($offset, $whence) = @_;
266 print STDERR "ThriftFH::SEEK() - implement me!\n";
267}
268
269sub DESTROY
270{
271 my $self = shift;
272 if ($self->{'fh'} != 0)
273 {
274 $self->{'client'}->close($self->{'fh'});
275 $self->{'fh'} = 0;
276 }
277}
278
279sub UNTIE
280{
281 my $self = shift;
282}
283
2841;
Note: See TracBrowser for help on using the repository browser.