############################################################################### # # HDFSShell.pm -- file functions acting upon a HDFS via the CLI hadoop # application # # A component of the Greenstone digital library software from the New Zealand # Digital Library Project at the University of Waikato, New Zealand. # # Copyright (C) 2013 New Zealand Digital Library Project # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 675 Mass # Ave, Cambridge, MA 02139, USA. # ############################################################################### package FileUtils::HDFSShell; # Pragma use strict; # Configuration my $debug = 0; ################################################################################ ######################### Private Functions & Variables ######################## ################################################################################ ## @function _executeHDFSCommand() # # Executes a HDFS command without caring about the resulting output # while still reacting appropriately to failed executions. # sub _executeHDFSCommand { my $return_result = shift(@_); if ($return_result != 0 && $return_result != 1) { &FileUtils::printError('Unexpected value for return_result argument - should be 0 or 1: ' . $return_result, 1); } my $command = &_generateHDFSCommand(@_); my $result = `$command 2>&1`; my $return_value = $?; &_printDebug(' -> util::executeHDFSCommand() => |' . $result . '| [' . $return_value . ']'); # sometimes we may want the actual resulting output returned, for # instance when parsing ls if ($return_result) { $return_value = $result; } return $return_value; } ## _executeHDFSCommand() ## @function _generateHDFSCommand() # sub _generateHDFSCommand { my $action = shift(@_); my @args = @_; my $arguments = ''; foreach my $path (@args) { # Replace the prefix with one HDFS Shell understands $path =~ s/HDFSShell:/hdfs:/; # special case for standard streams if ($path eq '-') { $arguments .= '- '; } else { $arguments .= '"' . $path . '" '; } } my $command = 'hadoop fs -' . $action . ' ' . $arguments; &_printDebug(' -> _generateHDFSCommand("' . $action . '", ...) => |' . $command . '|'); return $command; } ## _generateHDFSCommand() ## @function _printDebug() # sub _printDebug { my ($message) = @_; if ($debug) { print STDERR '[DEBUG] ' . $message . "\n"; } } ## _printDebug() ################################################################################ ############################### Public Functions ############################### ################################################################################ ## @function canRead() # sub canRead { my $path = shift(@_); # On my Hadoop setups it appears everyone can read everything... pretty sure # this won't always be the case but I'm not sure if there is some easy way to # determine readability (you'd need to parse the permissions, user, and group # and then somehow compare to the current user). So instead I'll just return # if the file exists return &fileTest($path, '-f'); } ## canRead() ## @function closeFileHandle() # sub closeFileHandle { my $fh_ref = shift(@_); close($$fh_ref); return 1; } ## closeFileHandle() ## @function fileSize() # sub fileSize { my ($path) = @_; my $file_stats = &fileStats($path); return $file_stats->{'filesize'}; } ## fileSize() ## @function fileStats() # sub fileStats { my ($path) = @_; my $stats = {}; my $result = &_executeHDFSCommand(1, 'ls', $path); # - parse the results if ($result =~ /([ds\-][rwx\-]+)\s+(\d+)\s+([^\s]+)\s+([^\s]+)\s+(\d+)\s+(\d\d\d\d-\d\d-\d\d)\s+(\d\d:\d\d)\s+([^\s]+)$/) { $stats->{'filename'} = $8; $stats->{'replicas'} = $2; $stats->{'filesize'} = $5; $stats->{'modification_date'} = $6; $stats->{'modification_time'} = $7; $stats->{'permissions'} = $1; $stats->{'userid'} = $3; $stats->{'groupid'} = $4; } else { &FileUtils::printError('Failed to parse -ls result: ' . $result, 1); } return $stats; } ## fileStats() ## @function fileTest() # sub fileTest { my ($filename_full_path, $test_op) = @_; # Sanity tests # Special case: HDFS doesn't support symlinking - swap for -e instead if (!defined $test_op || $test_op eq '-l') { $test_op = '-e'; } my $retval = -1; # cmd return fails > 0 are errors # Special case: the easiest way to support -f is to run a -e followed by a -d # (which should fail for files) if ($test_op eq '-f') { my $retval1 = &_executeHDFSCommand(0, 'test -e', $filename_full_path); if ($retval1 == 0) { my $retval2 = &_executeHDFSCommand(0, 'test -d', $filename_full_path); if ($retval2 > 0) { $retval = 0; } } } # very limited test op support for HDFS elsif ($test_op ne '-d' && $test_op ne '-e' && $test_op ne '-z') { &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op); } else { $retval = &_executeHDFSCommand(0, 'test ' . $test_op, $filename_full_path); } return ($retval == 0 ? 1 : 0); } ## fileTest() ## @function filenameConcatenate() # sub filenameConcatenate { my $protocol = shift(@_); my $filename = join('/', @_); # remove repeated slashes $filename =~ s/[\/]+/\//g; # append protocol (which may cause multiple slashes) $filename = $protocol . '/' . $filename; # strip any trailing slashes $filename =~ s/[\\\/]$//; return $filename; } ## filenameConcatenate() ## @function isFilenameAbsolute() # sub isFilenameAbsolute { # File paths against HDFS must be. return 1; } # isFilenameAbsolute() ## @function isHDFS # sub isHDFS { return 1; } ## isHDFS() ## @function isSpecialDirectory # sub isSpecialDirectory { my ($path) = @_; return ($path =~ /^HDFSShell:\/\/[a-zA-Z]+:\d+$/); } ## isSpecialDirectory() ## @function makeDirectory() # sub makeDirectory { my ($dir) = @_; my $result = &_executeHDFSCommand(0, 'mkdir', $dir); # HDFSShell mkdir returns 0 on success, -1 on failure return ($result == 0 ? 1 : 0); } ## makeDirectory() ## @function modificationTime() # sub modificationTime { my ($path) = @_; &FileUtils::printWarning("modificationTime() not supported"); my $file_stats = &fileStats($path); my $mod_date = $file_stats->{'modification_date'}; $mod_date =~ /(\d\d\d\d)-(\d\d)-(\d\d)/; my $mod_year = $1; my $mod_month = $2; my $mod_day = $3; my $mod_time = $file_stats->{'modification_time'}; $mod_time =~ /(\d\d):(\d\d)/; my $mod_hour = $1; my $mod_minute = $2; return 0; } ## modificationTime() ## @function openFileHandle() # sub openFileHandle { my ($path, $mode, $fh_ref) = @_; if ($mode eq '>>' || $mode eq 'a') { &FileUtils::printError('Append (>>) mode not supported', 1); } elsif ($mode eq '>' || $mode eq 'w') { # the put command fails if the file already exists if (&fileTest($path, '-e')) { &removeFiles($path); } open($$fh_ref, '| ' . &_generateHDFSCommand('put', '-', $path)) or &FileUtils::printError('Failed to open pipe to HDFS (put) for writing: ' . $path, 1); } else { open($$fh_ref, &_generateHDFSCommand('cat', $path) . ' |') or &FileUtils::printError('Failed to open pipe to HDFS (cat) for reading: ' . $path, 1); } return 1; } ## openFileHandle() ## @function readDirectory() # sub readDirectory { my ($path) = @_; my @files; my $result = &_executeHDFSCommand(1, 'ls', $path); if ($result =~ /No such file or directory/) { print STDERR "BOOM! BOOM! BOOM!\n"; return undef; } my @lines = split(/\r?\n/, $result); foreach my $line (@lines) { if ($line =~ /\/([^\/]+)$/) { my $file = $1; push(@files, $file); } } return \@files; } ## readDirectory() ## @function removeFiles() # sub removeFiles { my ($path, $including_dir) = @_; my $result; if (defined $including_dir && $including_dir) { $result = &_executeHDFSCommand(0, 'rmr', $path); } else { $result = &_executeHDFSCommand(0, 'rm', $path); } # HDFSShell mkdir returns 0 on success, -1 on failure return ($result == 0 ? 1 : 0); } ## removeFiles() ## @function removeFilesFiltered() # sub removeFilesFiltered { my ($paths, $accept_re, $reject_re) = @_; # Perform a depth first, recursive, removal of files and directories that # match the given accept and reject patterns my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths); my $num_removed = 0; foreach my $path (@paths_array) { # remove trailing slashes $path =~ s/[\/\\]+$//; if (!&fileTest($path, '-e')) { &FileUtils::printError('path does not exist: ' . $path); } elsif (&fileTest($path, '-d')) { my @files = @{&readDirectory($path)}; foreach my $file (@files) { my $child_path = $path . '/' . $file; $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re); } if (!defined $accept_re && !defined $reject_re) { # remove this directory my $result = &removeFiles($path, 1); if ($result != 1) { &FileUtils::printError('could not remove directory: ' . $path); } else { $num_removed++; } } } else { if (defined $reject_re && ($path =~ m/$reject_re/)) { next; } if ((!defined $accept_re) || ($path =~ m/$accept_re/)) { # remove this file my $result = &removeFiles($path); if ($result != 1) { &FileUtils::printError('could not remove file: ' . $path); } else { $num_removed++; } } } } return $num_removed; } ## removeFilesFiltered() ## @function removeFilesRecursive() # sub removeFilesRecursive { my ($path) = @_; # use the more general removeFilesFiltered() function with no accept # or reject expressions return &removeFilesFiltered($path, undef, undef); } ## removeFilesRecursive() ## @function supportsSymbolicLink # sub supportsSymbolicLink { return 0; } ## supportsSymbolicLink() ## @function transferFile() # sub transferFile { my ($mode, $src, $dst) = @_; my $result; if ($mode eq 'COPY') { $result = &_executeHDFSCommand(0, 'cp', $src, $dst); } else { $result = &_executeHDFSCommand(0, 'mv', $src, $dst); } # HDFSShell mkdir returns 0 on success, -1 on failure return ($result == 0 ? 1 : 0); } ## transferFile() ## @function transferFileFromLocal() # sub transferFileFromLocal { my ($mode, $src, $dst) = @_; if (!-f $src) { &FileUtils::printError('Source file (during ' . $mode . ') doesn\'t exists: ' . $src); } if (&fileTest($dst, '-d')) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; } if (&fileTest($dst, '-f')) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); } my $result = &_executeHDFSCommand(0, 'put', $src, $dst); my $remove_result = 1; if ($mode eq 'MOVE') { unlink($src); # failed to delete somehow if (-f $src) { $remove_result = 0; } } return ($result == 0 && $remove_result ? 1 : 0); } ## transferFileFromLocal() ## @function transferFileToLocal() # sub transferFileToLocal { my ($mode, $src, $dst) = @_; if (!&fileTest($src, '-f')) { &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src); } if (-d $dst) { my ($filename) = $src =~ /([^\\\/]+)$/; $dst .= '/' . $filename; } if (-e $dst) { &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst); } my $result = &_executeHDFSCommand(0, 'get', $src, $dst); my $remove_result = 1; if ($mode eq 'MOVE') { $remove_result = &removeFiles($src); } return ($result == 0 && $remove_result ? 1 : 0); } ## transferFileToLocal() 1;