source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS/ThriftFH.pm@ 27653

Last change on this file since 27653 was 27653, checked in by jmt12, 11 years ago

Forgot to pull self off the head of arguments

File size: 8.4 KB
Line 
1###########################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software
6# from the New Zealand Digital Library Project at the
7# University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify
12# it under the terms of the GNU General Public License as published by
13# the Free Software Foundation; either version 2 of the License, or
14# (at your option) any later version.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19# GNU General Public License for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24#
25###########################################################################
26
27################################################################################
28## An Object wrapped around a HDThrift file handle usable as a Perl Handle ##
29################################################################################
30
31package FileUtils::HDThriftFS::ThriftFH;
32
33require Tie::Handle;
34
35use base 'Tie::Handle';
36use Devel::Peek;
37use MIME::Base91;
38
39my $debug_encoding = 0;
40
41sub TIEHANDLE
42{
43 my $class = shift;
44 my $thrift_client = shift;
45 my $self = {};
46 $self->{'buffer_length'} = 4096; # 4k blocks
47 $self->{'client'} = $thrift_client;
48 $self->{'fh'} = 0;
49 $self->{'file_length'} = 0;
50 $self->{'mode'} = 'r';
51 $self->{'read_offset'} = 0; # A read offset
52 $self->{'warnings'} = {'binmode' => 0};
53 return bless $self, $class;
54}
55
56sub WRITE
57{
58 my $self = shift;
59 my ($scalar, $length, $offset) = @_;
60 print STDERR "ThriftFH::WRITE() - implement me!\n";
61}
62
63sub PRINT
64{
65 my $self = shift;
66 my $result = 0;
67 # only available in write or append modes
68 if ($self->{'mode'} eq 'w' || $self->{'mode'} eq 'a')
69 {
70 foreach my $decoded_buffer (@_)
71 {
72 if ($debug_encoding)
73 {
74 print STDERR "Print String: \n=== START ===\n"; Dump($decoded_buffer); print STDERR "\n=== END ===\n\n";
75 }
76 # We now need to Base91 encode everything sent through to Thrift Server
77 my $encoded_buffer = MIME::Base91::encode($decoded_buffer);
78 if ($debug_encoding)
79 {
80 print STDERR "Encoded String: \n=== START ===\n"; Dump($encoded_buffer); print STDERR "\n=== END ===\n\n";
81 }
82 $self->{'client'}->write($self->{'fh'}, $encoded_buffer);
83 }
84 $result = 1;
85 }
86 return $result;
87}
88
89sub PRINTF
90{
91 my $self = shift;
92 my $format = shift;
93 print STDERR "ThriftFH::PRINTF() - implement me!\n";
94}
95
96sub READ
97{
98 my $self = shift;
99 my ($scalar_ref, $length, $offset) = @_;
100 my $bytes_read = 0;
101 if (!$self->{'eof'} && $self->{'mode'} eq 'r')
102 {
103 my $encoded_buffer = $self->{'client'}->read($self->{'fh'}, $offset, $length);
104 if ($debug_encoding)
105 {
106 print STDERR "Read String: \n=== START ===\n"; Dump($encoded_buffer); print STDERR "\n=== END ===\n\n";
107 }
108 my $decoded_buffer = MIME::Base91::decode($encoded_buffer);
109 if ($debug_encoding)
110 {
111 print STDERR "Decoded String: \n=== START ===\n"; Dump($decoded_buffer); print STDERR "\n=== END ===\n\n";
112 }
113 $bytes_read = scalar($decoded_buffer);
114 # the only way I can see to modify the callers version of $scalar
115 $_[0] = $decoded_buffer;
116 }
117 return $bytes_read;
118}
119
120sub READLINE
121{
122 my $self = shift;
123 #rint "ThriftFH:READLINE()\n";
124 #rint " - file_length=" . $self->{'file_length'} . "\n";
125 my $line = undef;
126 # out of file? out of lines!
127 # not in read mode - can't read!
128 if (!$self->{'eof'} && $self->{'mode'} eq 'r')
129 {
130 my $found_line = 0;
131 my $search_offset = $self->{'read_offset'};
132 my $search_eof = $self->{'eof'};
133 # grab a 'buffer' of data starting at my current read offset
134 #rint "->read(fh, $search_offset, " . $self->{'buffer_length'} . ")\n";
135 my $encoded_buffer = $self->{'client'}->read($self->{'fh'}, $search_offset, $self->{'buffer_length'});
136 my $decoded_buffer = MIME::Base91::decode($encoded_buffer);
137 #rint " - buffer=|" . $decoded_buffer . "|\n";
138 if (length($decoded_buffer) < $self->{'buffer_length'} || ($self->{'read_offset'} + length($decoded_buffer)) < $self->{'file_length'})
139 {
140 $search_eof = 1;
141 }
142 while (!$search_eof && index($decoded_buffer, "\n") == -1)
143 {
144 #rint " * no newline found yet - filling buffer...\n";
145 $search_offset = $self->{'read_offset'} + length($decoded_buffer);
146 #rint "->read(fh, $search_offset, " . $self->{'buffer_length'} . ")\n";
147 my $more_encoded_buffer = $self->{'client'}->read($self->{'fh'}, $search_offset, $self->{'buffer_length'});
148 my $more_decoded_buffer = MIME::Base91::decode($more_encoded_buffer);
149 #rint " - more_buffer=|" . $more_decoded_buffer . "|\n";
150 $decoded_buffer .= $more_decoded_buffer;
151 #rint " - buffer=|" . $decoded_buffer. "|\n";
152 # if I read less than I asked for, or my next search offset is beyond the
153 # end of the file, then I've run out of 'file'
154 if (length($more_decoded_buffer) < $self->{'buffer_length'} || ($self->{'read_offset'} + length($decoded_buffer)) < $self->{'file_length'})
155 {
156 $search_eof = 1;
157 #rint " * found eof!\n";
158 }
159 }
160 # if buffer contains newline, then we only return a fragment of buffer and
161 # update the offset
162 my $newline_offset = index($decoded_buffer, "\n");
163 if ($newline_offset > -1)
164 {
165 #rint " * found newline - returning part of buffer\n";
166 $line = substr($decoded_buffer, 0, $newline_offset + 1);
167 # rest of buffer ignored
168 }
169 # otherwise we return all of buffer and mark the file as eof
170 else
171 {
172 #rint " * no newline found - returning all of buffer\n";
173 $line = $decoded_buffer;
174 $self->{'eof'} = 1; # out of file
175 }
176 # move the read pointer by however much we read (in either case)
177 $self->{'read_offset'} += length($line);
178 #rint " => |" . $line . "|\n\n";
179 }
180 return $line;
181}
182
183sub GETC
184{
185 my $self = shift;
186 print STDERR "ThriftFH::GETC() - implement me!\n";
187 return '';
188}
189
190sub CLOSE
191{
192 my $self = shift;
193 $self->{'client'}->close($self->{'fh'});
194 $self->{'fh'} = 0;
195 return 1;
196}
197
198
199## @function open()
200#
201sub OPEN
202{
203 my $self = shift;
204 my $path = shift;
205 my $mode = shift;
206 if (!defined $mode)
207 {
208 $mode = '<'; # Default to read as that's least destructive
209 }
210 #rint STDERR "DEBUG: ThriftFH::OPEN(self, $path, $mode)\n";
211 # write mode always creates a new file - clobbering any existing
212 if ($mode eq 'w' || $mode eq '>')
213 {
214 # if file already exists, create() will (eventually) fail - so we need to
215 # ensure it doesn't already exist beforehand
216 if ($self->{'client'}->exists($path))
217 {
218 $self->{'client'}->rm ($path);
219 }
220 $self->{'fh'} = $self->{'client'}->create($path);
221 $self->{'mode'} = 'w'; # writing
222 }
223 else
224 {
225 # open will fail if the file doesn't already exist - so we may
226 # need to create it beforehand
227 if (!$self->{'client'}->exists($path))
228 {
229 $self->{'fh'} = $self->{'client'}->create($path);
230 $self->{'file_length'} = 0;
231 }
232 else
233 {
234 $self->{'fh'} = $self->{'client'}->open($path);
235 # for reading we need to know the file size so as to avoid Thrift throwing
236 # exceptions when we try to read past the end
237 my $file_stat = $self->{'client'}->stat($path);
238 $self->{'file_length'} = $file_stat->{length};
239 }
240 $self->{'mode'} = 'r'; # reading
241 }
242 return $self;
243}
244## open() ##
245
246
247## @function binmode()
248#
249sub BINMODE
250{
251 my $self = shift;
252 # Have we warned about BIN mode not being applicable?
253 if ($self->{'warnings'}->{'binmode'} != 1)
254 {
255 print STDERR "Notice! BIN mode not applicable in ThriftFS (all data base91 encoded)\n";
256 # we have now!
257 $self->{'warnings'}->{'binmode'} = 1;
258 }
259}
260## BINMODE
261
262
263## @function eof()
264#
265sub EOF
266{
267 my $self = shift;
268 return $self->{'eof'};
269}
270## eof() ##
271
272
273sub TELL
274{
275 print STDERR "ThriftFH::TELL() - implement me!\n";
276}
277
278sub SEEK
279{
280 my $self = shift;
281 my ($offset, $whence) = @_;
282 print STDERR "ThriftFH::SEEK() - implement me!\n";
283}
284
285sub DESTROY
286{
287 my $self = shift;
288 if ($self->{'fh'} != 0)
289 {
290 $self->{'client'}->close($self->{'fh'});
291 $self->{'fh'} = 0;
292 }
293}
294
295sub UNTIE
296{
297 my $self = shift;
298}
299
3001;
Note: See TracBrowser for help on using the repository browser.