################################################################################ # # HDThriftFS.pm -- file functions acting upon a HDFS via thrift # # A component of the Greenstone digital library software from the New Zealand # Digital Library Project at the University of Waikato, New Zealand. # # Copyright (C) 2013 New Zealand Digital Library Project # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 675 Mass # Ave, Cambridge, MA 02139, USA. # ############################################################################### # Thrift acts as client-server 'relay' between the Perl code and the HDFS. It # allows for persistant connections and so is significantly faster than # repeatedly starting Hadoop's Java application over and over. In order to # connect to the Thrift server this code needs to know the host and port the # server may be found on - information currently hard-coded near the top of the # script. There are also a number of Perl module API 'bindings' generated by # the Thrift compilation process... currently located within the packages of # the Parallel Processing extension. Note that I make use of some tie() magic # so as to provide calling code with 'file handle'-like objects to interact # with (print, readline etc), so that is pretty cool. package FileUtils::HDThriftFS; # Pragma use strict; # Setup Environment BEGIN { die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'}; die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'}; die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'}; # Bit::Vector and Thrift modules unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'}); # ThriftFS Perl API unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/hadoop-1.1.0/contrib/thriftfs/gen-perl'); } # Modules - Core use Devel::Peek; use MIME::Base64; use POSIX qw(floor); use Symbol; use Thrift::Socket; use Thrift::BufferedTransport; use Thrift::BinaryProtocol; # Modules - Thrift use HadoopFS::FileSystem; # Modules - Greenstone use FileUtils::HDThriftFS::ThriftFH; use MIME::Base91; # Configuration my $host = "localhost"; my $port = 58660; my $debug_encoding = 0; # Globals my $transport; my $thrift_client; ## @function END() # # Ensure the transport layer, if open, is properly closed # END { if (defined $transport) { $transport->close(); } } ## END() ## @function _establishClient() # sub _establishClient { if (!defined $thrift_client) { ###rint "Create Thrift Client to $host:$port\n"; my $socket = Thrift::Socket->new($host, $port); $socket->setSendTimeout(10000); $socket->setRecvTimeout(20000); $transport = Thrift::BufferedTransport->new($socket); my $protocol = Thrift::BinaryProtocol->new($transport); $thrift_client = HadoopFS::FileSystemClient->new($protocol); eval { $transport->open(); }; if ($@) { &_printError('Unable to connect: ' . $@->{message}, 1); } } } ## _establishClient() ## @function _generateHDFSPath() # sub _generateHDFSPath { my ($path) = @_; if (ref($path) ne 'HadoopFS::Pathname') { if ($path !~ /HDThriftFS:\/\//) { &_printError('Not a valid thrift URI: ' . $path); } else { $path = HadoopFS::Pathname->new( { pathname => substr($path, 13) } ); } } return $path; } ## _generateHDFSPath() ################################################################################ ################################################################################ ################################################################################ ## @function closeFileHandle() # sub closeFileHandle { my $fh_ref = shift(@_); close($$fh_ref); untie($$fh_ref); return 1; } ## closeFileHandle() ## @function fileSize() # sub fileSize { my ($path, $test_op) = @_; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object $path = &_generateHDFSPath($path); my $file_stat = $thrift_client->stat($path); return $file_stat->{length}; } ## fileSize() ## @function fileTest() # sub fileTest { my ($raw_path, $test_op) = @_; my $result = 0; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object my $path = &_generateHDFSPath($raw_path); # note: symbolic linking not supported within HDFS if (!defined $test_op || $test_op eq '-l') { $test_op = '-e'; } if ($test_op eq '-d') { if ($thrift_client->exists($path)) { my $file = $thrift_client->stat($path); if ($file->{'isdir'}) { $result = 1; } } } elsif ($test_op eq '-e') { if ($thrift_client->exists($path)) { $result = 1; } } elsif ($test_op eq '-f') { if ($thrift_client->exists($path)) { my $file = $thrift_client->stat($path); if (!$file->{'isdir'}) { $result = 1; } } } else { &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op); } return $result; } ## fileTest() ## @function filenameConcatenate() # sub filenameConcatenate { my $protocol = shift(@_); my $filename = join('/', @_); # remove repeated slashes $filename =~ s/[\/]+/\//g; # append protocol (which may cause multiple slashes) $filename = $protocol . '/' . $filename; # strip any trailing slashes $filename =~ s/[\\\/]$//; return $filename; } ## filenameConcatenate() ## @function makeDirectory() # sub makeDirectory { my ($raw_path) = @_; my $result = 0; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object my $path = &_generateHDFSPath($raw_path); if (!&fileTest($path, '-d')) { # - create the directory $thrift_client->mkdirs($path); } # - check that it exists return (&fileTest($path, '-d')); } ## makeDirectory() ## @function modificationTime() # sub modificationTime { my ($path) = @_; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object $path = &_generateHDFSPath($path); my $file_stat = $thrift_client->stat($path); return floor($file_stat->{modificationTime} / 1000); } ## modificationTime() ## @function openFileHandle() # sub openFileHandle { my ($raw_path, $mode, $fh_ref) = @_; # ensure we have a connection to the thrift server &_establishClient(); #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n"; my $path = &_generateHDFSPath($raw_path); my $fh = gensym(); tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client); open($fh, $path, $mode) or die("Failed to open thriftfs"); $$fh_ref = $fh; return 1; } ## openFileHandle() ## @function readDirectory() # sub readDirectory { my ($raw_path) = @_; my @files; # ensure we have a connection to the thrift server &_establishClient(); my $path = &_generateHDFSPath($raw_path); my $raw_files = $thrift_client->listStatus($path); if ($raw_files && @{$raw_files} > 0) { foreach my $file_stat (@{$raw_files}) { my $file_path = $file_stat->{'path'}; my ($filename) = $file_path =~ /([^\\\/]+)$/; push(@files, $filename); } } return \@files; } ## readDirectory() ## @function removeFiles() # sub removeFiles { my ($path, $recursive) = @_; my $result = 0; if (!defined $recursive) { $recursive = 0; } # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object as necessary $path = &_generateHDFSPath($path); if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f'))) { $thrift_client->rm($path, $recursive); $result = !$thrift_client->exists($path); } return $result; } ## removeFiles() ## @function removeFilesFiltered() # sub removeFilesFiltered { my ($paths, $accept_re, $reject_re) = @_; # ensure we have a connection to the thrift server &_establishClient(); # Perform a depth first, recursive, removal of files and directories that # match the given accept and reject patterns my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths); my $num_removed = 0; foreach my $raw_path (@paths_array) { # remove trailing slashes $raw_path =~ s/[\/\\]+$//; my $path = &_generateHDFSPath($raw_path); if (!$thrift_client->exists($path)) { print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n"; } elsif (&fileTest($path, '-d')) { my @files = @{&readDirectory($path)}; foreach my $file (@files) { my $child_path = $raw_path . '/' . $file; $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re); } if (!defined $accept_re && !defined $reject_re) { # remove this directory - non-recursively so that the command fails # if there are (somehow) still files contained within $thrift_client->rm($path, 0); if ($thrift_client->exists($path)) { print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n"; } else { $num_removed++; } } } else { if (defined $reject_re && ($raw_path =~ m/$reject_re/)) { next; } if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/)) { # remove this file $thrift_client->rm($path, 0); if ($thrift_client->exists($path)) { print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n"; } else { $num_removed++; } } } } return $num_removed; } ## removeFilesFiltered() ## @function removeFilesRecursive() # sub removeFilesRecursive { my ($path) = @_; # use the more general removeFilesFiltered() function with no accept or reject # expressions return &removeFilesFiltered($path, undef, undef); } ## removeFilesRecursive() ## @function supportsSymbolicLink # sub supportsSymbolicLink { return 0; } ## supportsSymbolicLink() ## @function transferFile() # sub transferFile { my ($mode, $src, $dst) = @_; # ensure we have a connection to the thrift server &_establishClient(); #rint STDERR "transferFile($mode, $src, $dst)\n"; my $src_path = &_generateHDFSPath($src); my $dst_path = &_generateHDFSPath($dst); if (&fileTest($dst_path, '-d')) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; $dst_path = &_generateHDFSPath($dst); } if (!$thrift_client->exists($src_path)) { &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src); return 0; } if ($thrift_client->exists($dst_path)) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); return 0; } # what happens next depends on the mode, and is either very easy or really # hard if ($mode eq 'MOVE') { $thrift_client->rename($src_path, $dst_path); } elsif ($mode eq 'COPY') { # Open the src file for reading #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n"; my $fhin = $thrift_client->open($src_path); # Create the dst file for writing #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n"; my $fhout = $thrift_client->create($dst_path); # Read all of src file writing to dst file # - this is where things have the potential to go wrong, as it doesn't seem # thrift supports writing bytes # - only strings. May need to see if I can make Perl behave using black # magic flags (marking string as binary etc) It'll work fine for text # files though my $data = undef; my $offset = 0; my $length = 4096; # Read 4K blocks at a time while ($data = $thrift_client->read($fhin, $offset, $length)) { $thrift_client->write($fhout, $data); $offset += $length; if (length ($data) < $length) { last; } } # Close files $thrift_client->close($fhout); $thrift_client->close($fhin); } my $result = ($thrift_client->exists($dst_path)); #rint STDERR "transferFile() => $result\n"; return $result; } ## transferFile() ## @function transferFileFromLocal() # sub transferFileFromLocal { my ($mode, $src, $dst) = @_; # ensure we have a connection to the thrift server &_establishClient(); # destination is remote my $dst_path = &_generateHDFSPath($dst); if (&fileTest($dst_path, '-d')) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; $dst_path = &_generateHDFSPath($dst); } # can't replace - if the file already exists if (&fileTest($dst_path, '-f')) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); return 0; } # copy the file my $fhin; open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")"); my $decoded = ''; my $fhout = $thrift_client->create($dst_path); while (read($fhin, $decoded, 4096)) { if ($debug_encoding) { print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n"; } # Base64 encode to protect binary #my $encoded = encode_base64($decoded); # Base91 encode to protect binary - we add a Byte Order Marker so the # Thrift Server can detect the need to decode the string sent my $encoded = MIME::Base91::encode($decoded); if ($debug_encoding) { print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n"; } $thrift_client->write($fhout, $encoded); } close($fhin); $thrift_client->close($fhout); # in general, the transfer has worked if the destination file exists my $result = $thrift_client->exists($dst_path); # if moving, remove the source file from the local filesystem if ($mode eq 'MOVE') { unlink($src); # update result to reflect if we successfully removed the src file $result = $result && (!-f $src); } return $result } ## transferFileFromLocal() ## @function transferFileToLocal() # sub transferFileToLocal { my ($mode, $src, $dst) = @_; # ensure we have a connection to the thrift server &_establishClient(); # source is remote my $src_path = &_generateHDFSPath($src); if (!$thrift_client->exists($src_path)) { &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src); return 0; } if (-d $dst) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; } if (-e $dst) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); return 0; } # open local file my $fhout; my $encoded = undef; my $offset = 0; my $length = 4096; # Read 4K blocks at a time open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst); my $fhin = $thrift_client->open($src_path); while ($encoded = $thrift_client->read($fhin, $offset, $length)) { if ($debug_encoding) { print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n"; } #my $decoded = decode_base64($encoded); my $decoded = MIME::Base91::decode($encoded); if ($debug_encoding) { print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n"; } print $fhout $decoded; last if (length ($encoded) < $length); $offset += $length; } close($fhout); $thrift_client->close($fhin); # in general, the transfer has worked if the destination file exists my $result = (-f $dst); # if moving, remove the source file from the HDFS filesystem if ($mode eq 'MOVE') { $thrift_client->rm($src_path, 0); # update result to reflect if we successfully removed the src file $result = $result && !$thrift_client->exists($src_path); } return $result; } ## transferFileToLocal() 1;