root/gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm @ 27424

Revision 27424, 19.0 KB (checked in by jmt12, 6 years ago)

Adding canRead() and isAbsolute() functions, and some more debugging while tracking down issue with files of 4000-8000 bytes failing

Line 
1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS.  It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension.  Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47  die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
48  # Bit::Vector and Thrift modules
49  unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
50  # ThriftFS Perl API
51  unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/hadoop-1.1.0/contrib/thriftfs/gen-perl');
52}
53
54# Modules - Core
55use Devel::Peek;
56use MIME::Base64;
57use POSIX qw(floor);
58use Symbol;
59use Thrift::Socket;
60use Thrift::BufferedTransport;
61use Thrift::BinaryProtocol;
62# Modules - Thrift
63use HadoopFS::FileSystem;
64# Modules - Greenstone
65use FileUtils::HDThriftFS::ThriftFH;
66use MIME::Base91;
67
68# Configuration
69my $host = "localhost";
70my $port = 58660;
71my $debug = 0;
72my $debug_encoding = 0;
73
74# Globals
75my $transport;
76my $thrift_client;
77
78
79## @function END()
80#
81# Ensure the transport layer, if open, is properly closed
82#
83END
84{
85  if (defined $transport)
86  {
87    $transport->close();
88  }
89}
90## END()
91
92
93## @function _establishClient()
94#
95sub _establishClient
96{
97  if (!defined $thrift_client)
98  {
99    ###rint "Create Thrift Client to $host:$port\n";
100    my $socket = Thrift::Socket->new($host, $port);
101    $socket->setSendTimeout(10000);
102    $socket->setRecvTimeout(20000);
103
104    $transport = Thrift::BufferedTransport->new($socket);
105    my $protocol = Thrift::BinaryProtocol->new($transport);
106    $thrift_client = HadoopFS::FileSystemClient->new($protocol);
107
108    eval { $transport->open(); };
109    if ($@)
110    {
111      &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
112    }
113  }
114}
115## _establishClient()
116
117
118## @function _generateHDFSPath()
119#
120sub _generateHDFSPath
121{
122  my ($path) = @_;
123  if (ref($path) ne 'HadoopFS::Pathname')
124  {
125    if ($path !~ /HDThriftFS:\/\//)
126    {
127      &_printError('Not a valid thrift URI: ' . $path);
128    }
129    else
130    {
131      $path = HadoopFS::Pathname->new( { pathname => substr($path, 13) } );
132    }
133  }
134  return $path;
135}
136## _generateHDFSPath()
137
138
139## @function _printDebug()
140#
141sub _printDebug
142{
143  my ($msg) = @_;
144  if ($debug)
145  {
146    my ($package, $filename, $line, $function) = caller(1);
147    print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
148  }
149}
150## _printDebug()
151
152################################################################################
153################################################################################
154################################################################################
155
156
157## @function canRead()
158#
159sub canRead
160{
161  my $path = shift(@_);
162  return &checkPermission($path, 'r', @_);
163}
164## canRead()
165
166
167## @function checkPermission()
168#
169sub checkPermission
170{
171  my ($path, $mode, $username, $usergroup) = @_;
172  my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
173  # - ensure we have a connection to the thrift server
174  &_establishClient();
175  # - convert the path into a proper thrift path object
176  $path = &_generateHDFSPath($path);
177  # - determine the user (defaults to current user)
178  if (!defined $username)
179  {
180    if ($ENV{'GSDLOS'} =~ /^windows$/i)
181    {
182      require Win32;
183      $username = Win32::LoginName();
184    }
185    else
186    {
187      $username = getlogin || getpwuid($<);
188    }
189  }
190  # - determine the group
191  my $usergroups = {};
192  if (defined $usergroup)
193  {
194    $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
195  }
196  else
197  {
198    if ($ENV{'GSDLOS'} =~ /^windows$/i)
199    {
200      # dunno
201    }
202    else
203    {
204      my $raw_groups = `groups`;
205      foreach my $group ( split(/\s/, $raw_groups))
206      {
207        $usergroups->{$group} = 1;
208      }
209    }
210  }
211  # Retrieve details from the file
212  my $file_stat = $thrift_client->stat($path);
213  my $owner = $file_stat->{'owner'};
214  my $group = $file_stat->{'group'};
215  my $permissions = $file_stat->{'permission'};
216  # Begin the cascade of tests to determine if the identified user belonging to
217  # the identified group can perform 'mode' access to the file.
218  my $has_permission = 0;
219  # - start with [u]ser permission
220  if (defined $owner && $username eq $owner)
221  {
222    my $target_char = substr($permissions, $offsets->{$mode}, 1);
223    if ($mode eq $target_char)
224    {
225      $has_permission = 1;
226    }
227  }
228  # - failing that, try [g]roup level permissions
229  if (!$has_permission && defined $group && defined $usergroups->{$group})
230  {
231    my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
232    if ($mode eq $target_char)
233    {
234      $has_permission = 1;
235    }
236  }
237  # - and finally try [o]ther level permission
238  if (!$has_permission)
239  {
240    my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
241    if ($mode eq $target_char)
242    {
243      $has_permission = 1;
244    }
245  }
246  return $has_permission;
247}
248## checkPermission
249
250
251## @function closeFileHandle()
252#
253sub closeFileHandle
254{
255  my $fh_ref = shift(@_);
256  &_printDebug('');
257  close($$fh_ref);
258  untie($$fh_ref);
259  return 1;
260}
261## closeFileHandle()
262
263
264## @function fileSize()
265#
266sub fileSize
267{
268  my ($path, $test_op) = @_;
269  # ensure we have a connection to the thrift server
270  &_establishClient();
271  # - convert the path into a proper thrift path object
272  $path = &_generateHDFSPath($path);
273  my $file_stat = $thrift_client->stat($path);
274  return $file_stat->{length};
275}
276## fileSize()
277
278
279## @function fileTest()
280#
281sub fileTest
282{
283  my ($raw_path, $test_op) = @_;
284  my $result = 0;
285  # ensure we have a connection to the thrift server
286  &_establishClient();
287  # - convert the path into a proper thrift path object
288  my $path = &_generateHDFSPath($raw_path);
289  # note: symbolic linking not supported within HDFS
290  if (!defined $test_op || $test_op eq '-l')
291  {
292    $test_op = '-e';
293  }
294  if ($test_op eq '-d')
295  {
296    if ($thrift_client->exists($path))
297    {
298      my $file = $thrift_client->stat($path);
299      if ($file->{'isdir'})
300      {
301        $result = 1;
302      }
303    }
304  }
305  elsif ($test_op eq '-e')
306  {
307    if ($thrift_client->exists($path))
308    {
309      $result = 1;
310    }
311  }
312  elsif ($test_op eq '-f')
313  {
314    if ($thrift_client->exists($path))
315    {
316      my $file = $thrift_client->stat($path);
317      if (!$file->{'isdir'})
318      {
319        $result = 1;
320      }
321    }
322  }
323  else
324  {
325    &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
326  }
327  return $result;
328}
329## fileTest()
330
331
332## @function filenameConcatenate()
333#
334sub filenameConcatenate
335{
336  my $protocol = shift(@_);
337  my $filename = join('/', @_);
338  # remove repeated slashes
339  $filename =~ s/[\/]+/\//g;
340  # append protocol (which may cause multiple slashes)
341  $filename = $protocol . '/' . $filename;
342  # strip any trailing slashes
343  $filename =~ s/[\\\/]$//;
344  return $filename;
345}
346## filenameConcatenate()
347
348
349## @function isFilenameAbsolute()
350#
351sub isFilenameAbsolute
352{
353  # File paths against HDFS must be.
354  return 1;
355}
356# isFilenameAbsolute()
357
358
359## @function makeDirectory()
360#
361sub makeDirectory
362{
363  my ($raw_path) = @_;
364  my $result = 0;
365  # ensure we have a connection to the thrift server
366  &_establishClient();
367  # - convert the path into a proper thrift path object
368  my $path = &_generateHDFSPath($raw_path);
369  if (!&fileTest($path, '-d'))
370  {
371    # - create the directory
372    $thrift_client->mkdirs($path);
373  }
374  # - check that it exists
375  return (&fileTest($path, '-d'));
376}
377## makeDirectory()
378
379
380## @function modificationTime()
381#
382sub modificationTime
383{
384  my ($path) = @_;
385  # ensure we have a connection to the thrift server
386  &_establishClient();
387  # - convert the path into a proper thrift path object
388  $path = &_generateHDFSPath($path);
389  my $file_stat = $thrift_client->stat($path);
390  return floor($file_stat->{modificationTime} / 1000);
391}
392## modificationTime()
393
394
395## @function openFileHandle()
396#
397sub openFileHandle
398{
399  my ($raw_path, $mode, $fh_ref) = @_;
400  &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
401  # ensure we have a connection to the thrift server
402  &_establishClient();
403  #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
404  my $path = &_generateHDFSPath($raw_path);
405  my $fh = gensym();
406  tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
407  open($fh, $path, $mode) or die("Failed to open thriftfs");
408  $$fh_ref = $fh;
409  return 1;
410}
411## openFileHandle()
412
413
414## @function readDirectory()
415#
416sub readDirectory
417{
418  my ($raw_path) = @_;
419  my @files;
420  # ensure we have a connection to the thrift server
421  &_establishClient();
422  my $path = &_generateHDFSPath($raw_path);
423  my $raw_files = $thrift_client->listStatus($path);
424  if ($raw_files && @{$raw_files} > 0)
425  {
426    foreach my $file_stat (@{$raw_files})
427    {
428      my $file_path = $file_stat->{'path'};
429      my ($filename) = $file_path =~ /([^\\\/]+)$/;
430      push(@files, $filename);
431    }
432  }
433  return \@files;
434}
435## readDirectory()
436
437
438## @function removeFiles()
439#
440sub removeFiles
441{
442  my ($path, $recursive) = @_;
443  my $result = 0;
444  if (!defined $recursive)
445  {
446    $recursive = 0;
447  }
448  # ensure we have a connection to the thrift server
449  &_establishClient();
450  # - convert the path into a proper thrift path object as necessary
451  $path = &_generateHDFSPath($path);
452  if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
453  {
454    $thrift_client->rm($path, $recursive);
455    $result = !$thrift_client->exists($path);
456  }
457  return $result;
458}
459## removeFiles()
460
461
462## @function removeFilesFiltered()
463#
464sub removeFilesFiltered
465{
466  my ($paths, $accept_re, $reject_re) = @_;
467  # ensure we have a connection to the thrift server
468  &_establishClient();
469  # Perform a depth first, recursive, removal of files and directories that
470  # match the given accept and reject patterns
471  my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
472  my $num_removed = 0;
473  foreach my $raw_path (@paths_array)
474  {
475    # remove trailing slashes
476    $raw_path =~ s/[\/\\]+$//;
477    my $path = &_generateHDFSPath($raw_path);
478    if (!$thrift_client->exists($path))
479    {
480      print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
481    }
482    elsif (&fileTest($path, '-d'))
483    {
484      my @files = @{&readDirectory($path)};
485      foreach my $file (@files)
486      {
487        my $child_path = $raw_path . '/' . $file;
488        $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
489      }
490      if (!defined $accept_re && !defined $reject_re)
491      {
492        # remove this directory - non-recursively so that the command fails
493        # if there are (somehow) still files contained within
494        $thrift_client->rm($path, 0);
495        if ($thrift_client->exists($path))
496        {
497          print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
498        }
499        else
500        {
501          $num_removed++;
502        }
503      }
504    }
505    else
506    {
507      if (defined $reject_re && ($raw_path =~ m/$reject_re/))
508      {
509        next;
510      }
511      if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
512      {
513        # remove this file
514        $thrift_client->rm($path, 0);
515        if ($thrift_client->exists($path))
516        {
517          print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
518        }
519        else
520        {
521          $num_removed++;
522        }
523      }
524    }
525  }
526  return $num_removed;
527}
528## removeFilesFiltered()
529
530
531## @function removeFilesRecursive()
532#
533sub removeFilesRecursive
534{
535  my ($path) = @_;
536  # use the more general removeFilesFiltered() function with no accept or reject
537  # expressions
538  return &removeFilesFiltered($path, undef, undef);
539}
540## removeFilesRecursive()
541
542
543## @function supportsSymbolicLink
544#
545sub supportsSymbolicLink
546{
547  return 0;
548}
549## supportsSymbolicLink()
550
551
552## @function transferFile()
553#
554sub transferFile
555{
556  my ($mode, $src, $dst) = @_;
557  # ensure we have a connection to the thrift server
558  &_establishClient();
559  #rint STDERR "transferFile($mode, $src, $dst)\n";
560  my $src_path = &_generateHDFSPath($src);
561  my $dst_path = &_generateHDFSPath($dst);
562  if (&fileTest($dst_path, '-d'))
563  {
564    my ($filename) = $src =~ /([^\\\/]+)$/;
565    $dst .= '/' . $filename;
566    $dst_path = &_generateHDFSPath($dst);
567  }
568  if (!$thrift_client->exists($src_path))
569  {
570    &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
571    return 0;
572  }
573  if ($thrift_client->exists($dst_path))
574  {
575    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
576    return 0;
577  }
578  # what happens next depends on the mode, and is either very easy or really
579  # hard
580  if ($mode eq 'MOVE')
581  {
582    $thrift_client->rename($src_path, $dst_path);
583  }
584  elsif ($mode eq 'COPY')
585  {
586    # Open the src file for reading
587    #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
588    my $fhin = $thrift_client->open($src_path);
589    # Create the dst file for writing
590    #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
591    my $fhout = $thrift_client->create($dst_path);
592    # Read all of src file writing to dst file
593    # - this is where things have the potential to go wrong, as it doesn't seem
594    #   thrift supports writing bytes
595    # - only strings. May need to see if I can make Perl behave using black
596    #   magic flags (marking string as binary etc) It'll work fine for text
597    #   files though
598    my $data = undef;
599    my $offset = 0;
600    my $length = 4096;
601    # Read 4K blocks at a time
602    while ($data = $thrift_client->read($fhin, $offset, $length))
603    {
604      $thrift_client->write($fhout, $data);
605      $offset += $length;
606      if (length ($data) < $length)
607      {
608        last;
609      }
610    }
611    # Close files
612    $thrift_client->close($fhout);
613    $thrift_client->close($fhin);
614  }
615  my $result = ($thrift_client->exists($dst_path));
616  #rint STDERR "transferFile() => $result\n";
617  return $result;
618}
619## transferFile()
620
621
622## @function transferFileFromLocal()
623#
624sub transferFileFromLocal
625{
626  my ($mode, $src, $dst) = @_;
627  # ensure we have a connection to the thrift server
628  &_establishClient();
629  # destination is remote
630  my $dst_path = &_generateHDFSPath($dst);
631  if (&fileTest($dst_path, '-d'))
632  {
633    my ($filename) = $src =~ /([^\\\/]+)$/;
634    $dst .= '/' . $filename;
635    $dst_path = &_generateHDFSPath($dst);
636  }
637  # can't replace - if the file already exists
638  if (&fileTest($dst_path, '-f'))
639  {
640    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
641    return 0;
642  }
643  # copy the file
644  my $fhin;
645  open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
646  my $decoded = '';
647  my $fhout = $thrift_client->create($dst_path);
648  while (read($fhin, $decoded, 4096))
649  {
650    if ($debug_encoding)
651    {
652      print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
653    }
654    # Base64 encode to protect binary
655    #my $encoded = encode_base64($decoded);
656    # Base91 encode to protect binary - we add a Byte Order Marker so the
657    # Thrift Server can detect the need to decode the string sent
658    my $encoded = MIME::Base91::encode($decoded);
659    if ($debug_encoding)
660    {
661      print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
662    }
663    $thrift_client->write($fhout, $encoded);
664  }
665  close($fhin);
666  $thrift_client->close($fhout);
667  # in general, the transfer has worked if the destination file exists
668  my $result = $thrift_client->exists($dst_path);
669  # if moving, remove the source file from the local filesystem
670  if ($mode eq 'MOVE')
671  {
672    unlink($src);
673    # update result to reflect if we successfully removed the src file
674    $result = $result && (!-f $src);
675  }
676  return $result
677}
678## transferFileFromLocal()
679
680
681## @function transferFileToLocal()
682#
683sub transferFileToLocal
684{
685  my ($mode, $src, $dst) = @_;
686  # ensure we have a connection to the thrift server
687  &_establishClient();
688  # source is remote
689  my $src_path = &_generateHDFSPath($src);
690  if (!$thrift_client->exists($src_path))
691  {
692    &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
693    return 0;
694  }
695  if (-d $dst)
696  {
697    my ($filename) = $src =~ /([^\\\/]+)$/;
698    $dst .= '/' . $filename;
699  }
700  if (-e $dst)
701  {
702    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
703    return 0;
704  }
705  # open local file
706  my $fhout;
707  my $encoded = undef;
708  my $offset = 0;
709  my $length = 4096; # Read 4K blocks at a time
710  open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
711  my $fhin = $thrift_client->open($src_path);
712  while ($encoded = $thrift_client->read($fhin, $offset, $length))
713  {
714    if ($debug_encoding)
715    {
716      print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
717    }
718    #my $decoded = decode_base64($encoded);
719    my $decoded = MIME::Base91::decode($encoded);
720    if ($debug_encoding)
721    {
722      print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
723    }
724    print $fhout $decoded;
725    last if (length ($encoded) < $length);
726    $offset += $length;
727  }
728  close($fhout);
729  $thrift_client->close($fhin);
730  # in general, the transfer has worked if the destination file exists
731  my $result = (-f $dst);
732  # if moving, remove the source file from the HDFS filesystem
733  if ($mode eq 'MOVE')
734  {
735    $thrift_client->rm($src_path, 0);
736    # update result to reflect if we successfully removed the src file
737    $result = $result && !$thrift_client->exists($src_path);
738  }
739  return $result;
740}
741## transferFileToLocal()
742
743
7441;
Note: See TracBrowser for help on using the browser.