################################################################################ # # HDThriftFS.pm -- file functions acting upon a HDFS via thrift # # A component of the Greenstone digital library software from the New Zealand # Digital Library Project at the University of Waikato, New Zealand. # # Copyright (C) 2013 New Zealand Digital Library Project # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 675 Mass # Ave, Cambridge, MA 02139, USA. # ############################################################################### # Thrift acts as client-server 'relay' between the Perl code and the HDFS. It # allows for persistant connections and so is significantly faster than # repeatedly starting Hadoop's Java application over and over. In order to # connect to the Thrift server this code needs to know the host and port the # server may be found on - information currently hard-coded near the top of the # script. There are also a number of Perl module API 'bindings' generated by # the Thrift compilation process... currently located within the packages of # the Parallel Processing extension. Note that I make use of some tie() magic # so as to provide calling code with 'file handle'-like objects to interact # with (print, readline etc), so that is pretty cool. package FileUtils::HDThriftFS; # Pragma use strict; # Setup Environment BEGIN { die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'}; die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'}; die "GSDLCOLLECTDIR not set\n" unless defined $ENV{'GSDLCOLLECTDIR'}; # We need the Perl version before continuing if (!defined $ENV{'PERL_VERSION'}) { $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`; } die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'}; # Bit::Vector and Thrift modules unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'}); # ThriftFS Perl API unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl'); } # Modules - Core use Devel::Peek; use MIME::Base64; use POSIX qw(floor); use Symbol; use Thrift::Socket; use Thrift::BufferedTransport; use Thrift::BinaryProtocol; # Modules - Thrift use HadoopFS::FileSystem; # Modules - Greenstone use FileUtils::HDThriftFS::ThriftFH; use MIME::Base91; # Configuration my $host = "localhost"; my $port = 58660; my $debug = 0; my $debug_encoding = 0; # Testing shows 64k is pretty optimal #my $buffer_length = 4 * 1024; # 4k blocks #my $buffer_length = 8 * 1024; # 8k blocks #my $buffer_length = 16 * 1024; # 16K blocks #my $buffer_length = 32 * 1024; # 32k blocks #my $buffer_length = 64 * 1024; # 64k blocks #my $buffer_length = 128 * 1024; # 128k blocks #my $buffer_length = 256 * 1024; # 256k blocks my $buffer_length = 512 * 1024; # 512k blocks #my $buffer_length = 1024 * 1024; # 1M blocks #my $buffer_length = 2048 * 1024; # 2M blocks ## These cause "OUT OF MEMORY" errors on Medusa #my $buffer_length = 4096 * 1024; # 4M blocks #my $buffer_length = 8192 * 1024; # 8M blocks # Globals my $transport; my $thrift_client; ## @function END() # # Ensure the transport layer, if open, is properly closed # END { if (defined $transport) { $transport->close(); } } ## END() ## @function _establishClient() # sub _establishClient { if (!defined $thrift_client) { # Look for a configuration file to override the default localhost:58660 # settings my $conf_file_path = &FileUtils::filenameConcatenate($ENV{'GSDLCOLLECTDIR'}, 'etc', 'thrift.conf'); if (&FileUtils::fileExists($conf_file_path)) { print "Found Thrift configuration file:\n"; my $conf_raw = &FileUtils::fileGetContents($conf_file_path); if ($conf_raw =~ /^([^:]+):(\d+)/) { $host = $1; print " - Host: " . $host . "\n"; $port = $2; print " - Port: " . $port . "\n"; } } print "Establish Thrift client connecting to: $host:$port\n"; my $socket = Thrift::Socket->new($host, $port); $socket->setSendTimeout(10000); $socket->setRecvTimeout(20000); $transport = Thrift::BufferedTransport->new($socket); my $protocol = Thrift::BinaryProtocol->new($transport); $thrift_client = HadoopFS::FileSystemClient->new($protocol); eval { $transport->open(); }; if ($@) { &FileUtils::printError('Unable to connect: ' . $@->{message}, 1); } } } ## _establishClient() ## @function _generateHDFSPath() # sub _generateHDFSPath { my ($path) = @_; if (ref($path) ne 'HadoopFS::Pathname') { if ($path !~ /HDThriftFS:\/\//) { &FileUtils::printError('Not a valid thrift URI: ' . $path); } else { # Remove protocol and any host and port information $path =~ s/HDThriftFS:\/\/[^\/]*//; $path = HadoopFS::Pathname->new( { pathname => $path } ); } } return $path; } ## _generateHDFSPath() ## @function _printDebug() # sub _printDebug { my ($msg) = @_; if ($debug) { my ($package, $filename, $line, $function) = caller(1); print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n"; } } ## _printDebug() ################################################################################ ################################################################################ ################################################################################ ## @function canRead() # sub canRead { my $path = shift(@_); return &checkPermission($path, 'r', @_); } ## canRead() ## @function checkPermission() # sub checkPermission { my ($path, $mode, $username, $usergroup) = @_; my $offsets = {'r' => 0, 'w' => 1, 'x' => 2}; # - ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object $path = &_generateHDFSPath($path); # - determine the user (defaults to current user) if (!defined $username) { if ($ENV{'GSDLOS'} =~ /^windows$/i) { require Win32; $username = Win32::LoginName(); } else { $username = getlogin || getpwuid($<); } } # - determine the group my $usergroups = {}; if (defined $usergroup) { $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1}; } else { if ($ENV{'GSDLOS'} =~ /^windows$/i) { # dunno } else { my $raw_groups = `groups`; foreach my $group ( split(/\s/, $raw_groups)) { $usergroups->{$group} = 1; } } } # Retrieve details from the file my $file_stat = $thrift_client->stat($path); my $owner = $file_stat->{'owner'}; my $group = $file_stat->{'group'}; my $permissions = $file_stat->{'permission'}; # Begin the cascade of tests to determine if the identified user belonging to # the identified group can perform 'mode' access to the file. my $has_permission = 0; # - start with [u]ser permission if (defined $owner && $username eq $owner) { my $target_char = substr($permissions, $offsets->{$mode}, 1); if ($mode eq $target_char) { $has_permission = 1; } } # - failing that, try [g]roup level permissions if (!$has_permission && defined $group && defined $usergroups->{$group}) { my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1); if ($mode eq $target_char) { $has_permission = 1; } } # - and finally try [o]ther level permission if (!$has_permission) { my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1); if ($mode eq $target_char) { $has_permission = 1; } } return $has_permission; } ## checkPermission ## @function closeFileHandle() # sub closeFileHandle { my $fh_ref = shift(@_); &_printDebug(''); close($$fh_ref); untie($$fh_ref); return 1; } ## closeFileHandle() ## @function fileSize() # sub fileSize { my ($path, $test_op) = @_; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object $path = &_generateHDFSPath($path); my $file_stat = $thrift_client->stat($path); return $file_stat->{length}; } ## fileSize() ## @function fileTest() # sub fileTest { my ($raw_path, $test_op) = @_; my $result = 0; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object my $path = &_generateHDFSPath($raw_path); # note: symbolic linking not supported within HDFS if (!defined $test_op || $test_op eq '-l') { $test_op = '-e'; } if ($test_op eq '-d') { if ($thrift_client->exists($path)) { my $file = $thrift_client->stat($path); if ($file->{'isdir'}) { $result = 1; } } } elsif ($test_op eq '-e') { if ($thrift_client->exists($path)) { $result = 1; } } elsif ($test_op eq '-f') { if ($thrift_client->exists($path)) { my $file = $thrift_client->stat($path); if (!$file->{'isdir'}) { $result = 1; } } } else { &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op); } return $result; } ## fileTest() ## @function filenameConcatenate() # sub filenameConcatenate { my $protocol = shift(@_); my $filename = join('/', @_); # remove repeated slashes $filename =~ s/[\/]+/\//g; # append protocol (which may cause multiple slashes) $filename = $protocol . '/' . $filename; # strip any trailing slashes $filename =~ s/[\\\/]$//; return $filename; } ## filenameConcatenate() ## @function isFilenameAbsolute() # sub isFilenameAbsolute { # File paths against HDFS must be. return 1; } # isFilenameAbsolute() ## @function isHDFS # sub isHDFS { return 1; } ## isHDFS() ## @function makeDirectory() # sub makeDirectory { my ($raw_path) = @_; my $result = 0; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object my $path = &_generateHDFSPath($raw_path); if (!&fileTest($path, '-d')) { # - create the directory $thrift_client->mkdirs($path); } # - check that it exists return (&fileTest($path, '-d')); } ## makeDirectory() ## @function modificationTime() # sub modificationTime { my ($path) = @_; # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object $path = &_generateHDFSPath($path); my $file_stat = $thrift_client->stat($path); return floor($file_stat->{modificationTime} / 1000); } ## modificationTime() ## @function openFileHandle() # sub openFileHandle { my ($raw_path, $mode, $fh_ref) = @_; &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref'); # ensure we have a connection to the thrift server &_establishClient(); #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n"; my $path = &_generateHDFSPath($raw_path); my $fh = gensym(); tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client); open($fh, $path, $mode) or die("Failed to open thriftfs"); $$fh_ref = $fh; return 1; } ## openFileHandle() ## @function readDirectory() # sub readDirectory { my ($raw_path) = @_; my @files; # ensure we have a connection to the thrift server &_establishClient(); my $path = &_generateHDFSPath($raw_path); my $raw_files = $thrift_client->listStatus($path); if ($raw_files && @{$raw_files} > 0) { foreach my $file_stat (@{$raw_files}) { my $file_path = $file_stat->{'path'}; my ($filename) = $file_path =~ /([^\\\/]+)$/; push(@files, $filename); } } return \@files; } ## readDirectory() ## @function removeFiles() # sub removeFiles { my ($path, $recursive) = @_; my $result = 0; if (!defined $recursive) { $recursive = 0; } # ensure we have a connection to the thrift server &_establishClient(); # - convert the path into a proper thrift path object as necessary $path = &_generateHDFSPath($path); if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f'))) { $thrift_client->rm($path, $recursive); $result = !$thrift_client->exists($path); } return $result; } ## removeFiles() ## @function removeFilesFiltered() # sub removeFilesFiltered { my ($paths, $accept_re, $reject_re) = @_; # ensure we have a connection to the thrift server &_establishClient(); # Perform a depth first, recursive, removal of files and directories that # match the given accept and reject patterns my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths); my $num_removed = 0; foreach my $raw_path (@paths_array) { # remove trailing slashes $raw_path =~ s/[\/\\]+$//; my $path = &_generateHDFSPath($raw_path); if (!$thrift_client->exists($path)) { print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n"; } elsif (&fileTest($path, '-d')) { my @files = @{&readDirectory($path)}; foreach my $file (@files) { my $child_path = $raw_path . '/' . $file; $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re); } if (!defined $accept_re && !defined $reject_re) { # remove this directory - non-recursively so that the command fails # if there are (somehow) still files contained within $thrift_client->rm($path, 0); if ($thrift_client->exists($path)) { print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n"; } else { $num_removed++; } } } else { if (defined $reject_re && ($raw_path =~ m/$reject_re/)) { next; } if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/)) { # remove this file $thrift_client->rm($path, 0); if ($thrift_client->exists($path)) { print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n"; } else { $num_removed++; } } } } return $num_removed; } ## removeFilesFiltered() ## @function removeFilesRecursive() # sub removeFilesRecursive { my ($path) = @_; # use the more general removeFilesFiltered() function with no accept or reject # expressions return &removeFilesFiltered($path, undef, undef); } ## removeFilesRecursive() ## @function supportsSymbolicLink # sub supportsSymbolicLink { return 0; } ## supportsSymbolicLink() ## @function transferFile() # sub transferFile { my ($mode, $src, $dst) = @_; # ensure we have a connection to the thrift server &_establishClient(); #rint STDERR "transferFile($mode, $src, $dst)\n"; my $src_path = &_generateHDFSPath($src); my $dst_path = &_generateHDFSPath($dst); if (&fileTest($dst_path, '-d')) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; $dst_path = &_generateHDFSPath($dst); } if (!$thrift_client->exists($src_path)) { &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src); return 0; } if ($thrift_client->exists($dst_path)) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); return 0; } # what happens next depends on the mode, and is either very easy or really # hard if ($mode eq 'MOVE') { $thrift_client->rename($src_path, $dst_path); } elsif ($mode eq 'COPY') { # Open the src file for reading #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n"; my $fhin = $thrift_client->open($src_path); # Create the dst file for writing #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n"; my $fhout = $thrift_client->create($dst_path); # Read all of src file writing to dst file # - this is where things have the potential to go wrong, as it doesn't seem # thrift supports writing bytes # - only strings. May need to see if I can make Perl behave using black # magic flags (marking string as binary etc) It'll work fine for text # files though my $data = undef; my $offset = 0; # Read 4K blocks at a time while ($data = $thrift_client->read($fhin, $offset, $buffer_length)) { $thrift_client->write($fhout, $data); $offset += $buffer_length; if (length ($data) < $buffer_length) { last; } } # Close files $thrift_client->close($fhout); $thrift_client->close($fhin); } my $result = ($thrift_client->exists($dst_path)); #rint STDERR "transferFile() => $result\n"; return $result; } ## transferFile() ## @function transferFileFromLocal() # sub transferFileFromLocal { my ($mode, $src, $dst) = @_; # ensure we have a connection to the thrift server &_establishClient(); # destination is remote my $dst_path = &_generateHDFSPath($dst); if (&fileTest($dst_path, '-d')) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; $dst_path = &_generateHDFSPath($dst); } # can't replace - if the file already exists if (&fileTest($dst_path, '-f')) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); return 0; } # copy the file my $fhin; open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")"); my $decoded = ''; my $fhout = $thrift_client->create($dst_path); while (read($fhin, $decoded, $buffer_length)) { if ($debug_encoding) { print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n"; } # Base64 encode to protect binary #my $encoded = encode_base64($decoded); # Base91 encode to protect binary - we add a Byte Order Marker so the # Thrift Server can detect the need to decode the string sent my $encoded = MIME::Base91::encode($decoded); if ($debug_encoding) { print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n"; } $thrift_client->write($fhout, $encoded); } close($fhin); $thrift_client->close($fhout); # in general, the transfer has worked if the destination file exists my $result = $thrift_client->exists($dst_path); # if moving, remove the source file from the local filesystem if ($mode eq 'MOVE') { unlink($src); # update result to reflect if we successfully removed the src file $result = $result && (!-f $src); } return $result } ## transferFileFromLocal() ## @function transferFileToLocal() # sub transferFileToLocal { my ($mode, $src, $dst) = @_; # ensure we have a connection to the thrift server &_establishClient(); # source is remote my $src_path = &_generateHDFSPath($src); if (!$thrift_client->exists($src_path)) { &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src); return 0; } if (-d $dst) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; } if (-e $dst) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); return 0; } # open local file my $fhout; my $encoded = undef; my $offset = 0; open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst); my $fhin = $thrift_client->open($src_path); while ($encoded = $thrift_client->read($fhin, $offset, $buffer_length)) { if ($debug_encoding) { print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n"; } my $decoded = MIME::Base91::decode($encoded); if ($debug_encoding) { print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n"; } print $fhout $decoded; if (length ($decoded) < $buffer_length) { last; } else { $offset += $buffer_length; } } close($fhout); $thrift_client->close($fhin); # in general, the transfer has worked if the destination file exists my $result = (-f $dst); # if moving, remove the source file from the HDFS filesystem if ($mode eq 'MOVE') { $thrift_client->rm($src_path, 0); # update result to reflect if we successfully removed the src file $result = $result && !$thrift_client->exists($src_path); } return $result; } ## transferFileToLocal() 1;