source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27386

Last change on this file since 27386 was 27386, checked in by jmt12, 11 years ago

Forgot these were just symbolic links to my Dropbox folder - adding in the actual files

File size: 16.3 KB
RevLine 
[27386]1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
48 # Bit::Vector and Thrift modules
49 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
50 # ThriftFS Perl API
51 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/hadoop-1.1.0/contrib/thriftfs/gen-perl');
52}
53
54# Modules - Core
55use Devel::Peek;
56use MIME::Base64;
57use POSIX qw(floor);
58use Symbol;
59use Thrift::Socket;
60use Thrift::BufferedTransport;
61use Thrift::BinaryProtocol;
62# Modules - Thrift
63use HadoopFS::FileSystem;
64# Modules - Greenstone
65use FileUtils::HDThriftFS::ThriftFH;
66use MIME::Base91;
67
68# Configuration
69my $host = "localhost";
70my $port = 58660;
71my $debug_encoding = 0;
72
73# Globals
74my $transport;
75my $thrift_client;
76
77
78## @function END()
79#
80# Ensure the transport layer, if open, is properly closed
81#
82END
83{
84 if (defined $transport)
85 {
86 $transport->close();
87 }
88}
89## END()
90
91
92## @function _establishClient()
93#
94sub _establishClient
95{
96 if (!defined $thrift_client)
97 {
98 ###rint "Create Thrift Client to $host:$port\n";
99 my $socket = Thrift::Socket->new($host, $port);
100 $socket->setSendTimeout(10000);
101 $socket->setRecvTimeout(20000);
102
103 $transport = Thrift::BufferedTransport->new($socket);
104 my $protocol = Thrift::BinaryProtocol->new($transport);
105 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
106
107 eval { $transport->open(); };
108 if ($@)
109 {
110 &_printError('Unable to connect: ' . $@->{message}, 1);
111 }
112 }
113}
114## _establishClient()
115
116
117## @function _generateHDFSPath()
118#
119sub _generateHDFSPath
120{
121 my ($path) = @_;
122 if (ref($path) ne 'HadoopFS::Pathname')
123 {
124 if ($path !~ /HDThriftFS:\/\//)
125 {
126 &_printError('Not a valid thrift URI: ' . $path);
127 }
128 else
129 {
130 $path = HadoopFS::Pathname->new( { pathname => substr($path, 13) } );
131 }
132 }
133 return $path;
134}
135## _generateHDFSPath()
136
137
138################################################################################
139################################################################################
140################################################################################
141
142
143## @function closeFileHandle()
144#
145sub closeFileHandle
146{
147 my $fh_ref = shift(@_);
148 close($$fh_ref);
149 untie($$fh_ref);
150 return 1;
151}
152## closeFileHandle()
153
154
155## @function fileSize()
156#
157sub fileSize
158{
159 my ($path, $test_op) = @_;
160 # ensure we have a connection to the thrift server
161 &_establishClient();
162 # - convert the path into a proper thrift path object
163 $path = &_generateHDFSPath($path);
164 my $file_stat = $thrift_client->stat($path);
165 return $file_stat->{length};
166}
167## fileSize()
168
169
170## @function fileTest()
171#
172sub fileTest
173{
174 my ($raw_path, $test_op) = @_;
175 my $result = 0;
176 # ensure we have a connection to the thrift server
177 &_establishClient();
178 # - convert the path into a proper thrift path object
179 my $path = &_generateHDFSPath($raw_path);
180 # note: symbolic linking not supported within HDFS
181 if (!defined $test_op || $test_op eq '-l')
182 {
183 $test_op = '-e';
184 }
185 if ($test_op eq '-d')
186 {
187 if ($thrift_client->exists($path))
188 {
189 my $file = $thrift_client->stat($path);
190 if ($file->{'isdir'})
191 {
192 $result = 1;
193 }
194 }
195 }
196 elsif ($test_op eq '-e')
197 {
198 if ($thrift_client->exists($path))
199 {
200 $result = 1;
201 }
202 }
203 elsif ($test_op eq '-f')
204 {
205 if ($thrift_client->exists($path))
206 {
207 my $file = $thrift_client->stat($path);
208 if (!$file->{'isdir'})
209 {
210 $result = 1;
211 }
212 }
213 }
214 else
215 {
216 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
217 }
218 return $result;
219}
220## fileTest()
221
222
223## @function filenameConcatenate()
224#
225sub filenameConcatenate
226{
227 my $protocol = shift(@_);
228 my $filename = join('/', @_);
229 # remove repeated slashes
230 $filename =~ s/[\/]+/\//g;
231 # append protocol (which may cause multiple slashes)
232 $filename = $protocol . '/' . $filename;
233 # strip any trailing slashes
234 $filename =~ s/[\\\/]$//;
235 return $filename;
236}
237## filenameConcatenate()
238
239
240## @function makeDirectory()
241#
242sub makeDirectory
243{
244 my ($raw_path) = @_;
245 my $result = 0;
246 # ensure we have a connection to the thrift server
247 &_establishClient();
248 # - convert the path into a proper thrift path object
249 my $path = &_generateHDFSPath($raw_path);
250 if (!&fileTest($path, '-d'))
251 {
252 # - create the directory
253 $thrift_client->mkdirs($path);
254 }
255 # - check that it exists
256 return (&fileTest($path, '-d'));
257}
258## makeDirectory()
259
260
261## @function modificationTime()
262#
263sub modificationTime
264{
265 my ($path) = @_;
266 # ensure we have a connection to the thrift server
267 &_establishClient();
268 # - convert the path into a proper thrift path object
269 $path = &_generateHDFSPath($path);
270 my $file_stat = $thrift_client->stat($path);
271 return floor($file_stat->{modificationTime} / 1000);
272}
273## modificationTime()
274
275
276## @function openFileHandle()
277#
278sub openFileHandle
279{
280 my ($raw_path, $mode, $fh_ref) = @_;
281 # ensure we have a connection to the thrift server
282 &_establishClient();
283 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
284 my $path = &_generateHDFSPath($raw_path);
285 my $fh = gensym();
286 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
287 open($fh, $path, $mode) or die("Failed to open thriftfs");
288 $$fh_ref = $fh;
289 return 1;
290}
291## openFileHandle()
292
293
294## @function readDirectory()
295#
296sub readDirectory
297{
298 my ($raw_path) = @_;
299 my @files;
300 # ensure we have a connection to the thrift server
301 &_establishClient();
302 my $path = &_generateHDFSPath($raw_path);
303 my $raw_files = $thrift_client->listStatus($path);
304 if ($raw_files && @{$raw_files} > 0)
305 {
306 foreach my $file_stat (@{$raw_files})
307 {
308 my $file_path = $file_stat->{'path'};
309 my ($filename) = $file_path =~ /([^\\\/]+)$/;
310 push(@files, $filename);
311 }
312 }
313 return \@files;
314}
315## readDirectory()
316
317
318## @function removeFiles()
319#
320sub removeFiles
321{
322 my ($path, $recursive) = @_;
323 my $result = 0;
324 if (!defined $recursive)
325 {
326 $recursive = 0;
327 }
328 # ensure we have a connection to the thrift server
329 &_establishClient();
330 # - convert the path into a proper thrift path object as necessary
331 $path = &_generateHDFSPath($path);
332 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
333 {
334 $thrift_client->rm($path, $recursive);
335 $result = !$thrift_client->exists($path);
336 }
337 return $result;
338}
339## removeFiles()
340
341
342## @function removeFilesFiltered()
343#
344sub removeFilesFiltered
345{
346 my ($paths, $accept_re, $reject_re) = @_;
347 # ensure we have a connection to the thrift server
348 &_establishClient();
349 # Perform a depth first, recursive, removal of files and directories that
350 # match the given accept and reject patterns
351 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
352 my $num_removed = 0;
353 foreach my $raw_path (@paths_array)
354 {
355 # remove trailing slashes
356 $raw_path =~ s/[\/\\]+$//;
357 my $path = &_generateHDFSPath($raw_path);
358 if (!$thrift_client->exists($path))
359 {
360 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
361 }
362 elsif (&fileTest($path, '-d'))
363 {
364 my @files = @{&readDirectory($path)};
365 foreach my $file (@files)
366 {
367 my $child_path = $raw_path . '/' . $file;
368 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
369 }
370 if (!defined $accept_re && !defined $reject_re)
371 {
372 # remove this directory - non-recursively so that the command fails
373 # if there are (somehow) still files contained within
374 $thrift_client->rm($path, 0);
375 if ($thrift_client->exists($path))
376 {
377 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
378 }
379 else
380 {
381 $num_removed++;
382 }
383 }
384 }
385 else
386 {
387 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
388 {
389 next;
390 }
391 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
392 {
393 # remove this file
394 $thrift_client->rm($path, 0);
395 if ($thrift_client->exists($path))
396 {
397 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
398 }
399 else
400 {
401 $num_removed++;
402 }
403 }
404 }
405 }
406 return $num_removed;
407}
408## removeFilesFiltered()
409
410
411## @function removeFilesRecursive()
412#
413sub removeFilesRecursive
414{
415 my ($path) = @_;
416 # use the more general removeFilesFiltered() function with no accept or reject
417 # expressions
418 return &removeFilesFiltered($path, undef, undef);
419}
420## removeFilesRecursive()
421
422
423## @function supportsSymbolicLink
424#
425sub supportsSymbolicLink
426{
427 return 0;
428}
429## supportsSymbolicLink()
430
431
432## @function transferFile()
433#
434sub transferFile
435{
436 my ($mode, $src, $dst) = @_;
437 # ensure we have a connection to the thrift server
438 &_establishClient();
439 #rint STDERR "transferFile($mode, $src, $dst)\n";
440 my $src_path = &_generateHDFSPath($src);
441 my $dst_path = &_generateHDFSPath($dst);
442 if (&fileTest($dst_path, '-d'))
443 {
444 my ($filename) = $src =~ /([^\\\/]+)$/;
445 $dst .= '/' . $filename;
446 $dst_path = &_generateHDFSPath($dst);
447 }
448 if (!$thrift_client->exists($src_path))
449 {
450 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
451 return 0;
452 }
453 if ($thrift_client->exists($dst_path))
454 {
455 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
456 return 0;
457 }
458 # what happens next depends on the mode, and is either very easy or really
459 # hard
460 if ($mode eq 'MOVE')
461 {
462 $thrift_client->rename($src_path, $dst_path);
463 }
464 elsif ($mode eq 'COPY')
465 {
466 # Open the src file for reading
467 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
468 my $fhin = $thrift_client->open($src_path);
469 # Create the dst file for writing
470 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
471 my $fhout = $thrift_client->create($dst_path);
472 # Read all of src file writing to dst file
473 # - this is where things have the potential to go wrong, as it doesn't seem
474 # thrift supports writing bytes
475 # - only strings. May need to see if I can make Perl behave using black
476 # magic flags (marking string as binary etc) It'll work fine for text
477 # files though
478 my $data = undef;
479 my $offset = 0;
480 my $length = 4096;
481 # Read 4K blocks at a time
482 while ($data = $thrift_client->read($fhin, $offset, $length))
483 {
484 $thrift_client->write($fhout, $data);
485 $offset += $length;
486 if (length ($data) < $length)
487 {
488 last;
489 }
490 }
491 # Close files
492 $thrift_client->close($fhout);
493 $thrift_client->close($fhin);
494 }
495 my $result = ($thrift_client->exists($dst_path));
496 #rint STDERR "transferFile() => $result\n";
497 return $result;
498}
499## transferFile()
500
501
502## @function transferFileFromLocal()
503#
504sub transferFileFromLocal
505{
506 my ($mode, $src, $dst) = @_;
507 # ensure we have a connection to the thrift server
508 &_establishClient();
509 # destination is remote
510 my $dst_path = &_generateHDFSPath($dst);
511 if (&fileTest($dst_path, '-d'))
512 {
513 my ($filename) = $src =~ /([^\\\/]+)$/;
514 $dst .= '/' . $filename;
515 $dst_path = &_generateHDFSPath($dst);
516 }
517 # can't replace - if the file already exists
518 if (&fileTest($dst_path, '-f'))
519 {
520 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
521 return 0;
522 }
523 # copy the file
524 my $fhin;
525 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
526 my $decoded = '';
527 my $fhout = $thrift_client->create($dst_path);
528 while (read($fhin, $decoded, 4096))
529 {
530 if ($debug_encoding)
531 {
532 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
533 }
534 # Base64 encode to protect binary
535 #my $encoded = encode_base64($decoded);
536 # Base91 encode to protect binary - we add a Byte Order Marker so the
537 # Thrift Server can detect the need to decode the string sent
538 my $encoded = MIME::Base91::encode($decoded);
539 if ($debug_encoding)
540 {
541 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
542 }
543 $thrift_client->write($fhout, $encoded);
544 }
545 close($fhin);
546 $thrift_client->close($fhout);
547 # in general, the transfer has worked if the destination file exists
548 my $result = $thrift_client->exists($dst_path);
549 # if moving, remove the source file from the local filesystem
550 if ($mode eq 'MOVE')
551 {
552 unlink($src);
553 # update result to reflect if we successfully removed the src file
554 $result = $result && (!-f $src);
555 }
556 return $result
557}
558## transferFileFromLocal()
559
560
561## @function transferFileToLocal()
562#
563sub transferFileToLocal
564{
565 my ($mode, $src, $dst) = @_;
566 # ensure we have a connection to the thrift server
567 &_establishClient();
568 # source is remote
569 my $src_path = &_generateHDFSPath($src);
570 if (!$thrift_client->exists($src_path))
571 {
572 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
573 return 0;
574 }
575 if (-d $dst)
576 {
577 my ($filename) = $src =~ /([^\\\/]+)$/;
578 $dst .= '/' . $filename;
579 }
580 if (-e $dst)
581 {
582 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
583 return 0;
584 }
585 # open local file
586 my $fhout;
587 my $encoded = undef;
588 my $offset = 0;
589 my $length = 4096; # Read 4K blocks at a time
590 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
591 my $fhin = $thrift_client->open($src_path);
592 while ($encoded = $thrift_client->read($fhin, $offset, $length))
593 {
594 if ($debug_encoding)
595 {
596 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
597 }
598 #my $decoded = decode_base64($encoded);
599 my $decoded = MIME::Base91::decode($encoded);
600 if ($debug_encoding)
601 {
602 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
603 }
604 print $fhout $decoded;
605 last if (length ($encoded) < $length);
606 $offset += $length;
607 }
608 close($fhout);
609 $thrift_client->close($fhin);
610 # in general, the transfer has worked if the destination file exists
611 my $result = (-f $dst);
612 # if moving, remove the source file from the HDFS filesystem
613 if ($mode eq 'MOVE')
614 {
615 $thrift_client->rm($src_path, 0);
616 # update result to reflect if we successfully removed the src file
617 $result = $result && !$thrift_client->exists($src_path);
618 }
619 return $result;
620}
621## transferFileToLocal()
622
623
6241;
Note: See TracBrowser for help on using the repository browser.