root/gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm @ 27386

Revision 27386, 16.3 KB (checked in by jmt12, 6 years ago)

Forgot these were just symbolic links to my Dropbox folder - adding in the actual files

Line 
1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS.  It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension.  Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47  die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
48  # Bit::Vector and Thrift modules
49  unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
50  # ThriftFS Perl API
51  unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/hadoop-1.1.0/contrib/thriftfs/gen-perl');
52}
53
54# Modules - Core
55use Devel::Peek;
56use MIME::Base64;
57use POSIX qw(floor);
58use Symbol;
59use Thrift::Socket;
60use Thrift::BufferedTransport;
61use Thrift::BinaryProtocol;
62# Modules - Thrift
63use HadoopFS::FileSystem;
64# Modules - Greenstone
65use FileUtils::HDThriftFS::ThriftFH;
66use MIME::Base91;
67
68# Configuration
69my $host = "localhost";
70my $port = 58660;
71my $debug_encoding = 0;
72
73# Globals
74my $transport;
75my $thrift_client;
76
77
78## @function END()
79#
80# Ensure the transport layer, if open, is properly closed
81#
82END
83{
84  if (defined $transport)
85  {
86    $transport->close();
87  }
88}
89## END()
90
91
92## @function _establishClient()
93#
94sub _establishClient
95{
96  if (!defined $thrift_client)
97  {
98    ###rint "Create Thrift Client to $host:$port\n";
99    my $socket = Thrift::Socket->new($host, $port);
100    $socket->setSendTimeout(10000);
101    $socket->setRecvTimeout(20000);
102
103    $transport = Thrift::BufferedTransport->new($socket);
104    my $protocol = Thrift::BinaryProtocol->new($transport);
105    $thrift_client = HadoopFS::FileSystemClient->new($protocol);
106
107    eval { $transport->open(); };
108    if ($@)
109    {
110      &_printError('Unable to connect: ' . $@->{message}, 1);
111    }
112  }
113}
114## _establishClient()
115
116
117## @function _generateHDFSPath()
118#
119sub _generateHDFSPath
120{
121  my ($path) = @_;
122  if (ref($path) ne 'HadoopFS::Pathname')
123  {
124    if ($path !~ /HDThriftFS:\/\//)
125    {
126      &_printError('Not a valid thrift URI: ' . $path);
127    }
128    else
129    {
130      $path = HadoopFS::Pathname->new( { pathname => substr($path, 13) } );
131    }
132  }
133  return $path;
134}
135## _generateHDFSPath()
136
137
138################################################################################
139################################################################################
140################################################################################
141
142
143## @function closeFileHandle()
144#
145sub closeFileHandle
146{
147  my $fh_ref = shift(@_);
148  close($$fh_ref);
149  untie($$fh_ref);
150  return 1;
151}
152## closeFileHandle()
153
154
155## @function fileSize()
156#
157sub fileSize
158{
159  my ($path, $test_op) = @_;
160  # ensure we have a connection to the thrift server
161  &_establishClient();
162  # - convert the path into a proper thrift path object
163  $path = &_generateHDFSPath($path);
164  my $file_stat = $thrift_client->stat($path);
165  return $file_stat->{length};
166}
167## fileSize()
168
169
170## @function fileTest()
171#
172sub fileTest
173{
174  my ($raw_path, $test_op) = @_;
175  my $result = 0;
176  # ensure we have a connection to the thrift server
177  &_establishClient();
178  # - convert the path into a proper thrift path object
179  my $path = &_generateHDFSPath($raw_path);
180  # note: symbolic linking not supported within HDFS
181  if (!defined $test_op || $test_op eq '-l')
182  {
183    $test_op = '-e';
184  }
185  if ($test_op eq '-d')
186  {
187    if ($thrift_client->exists($path))
188    {
189      my $file = $thrift_client->stat($path);
190      if ($file->{'isdir'})
191      {
192        $result = 1;
193      }
194    }
195  }
196  elsif ($test_op eq '-e')
197  {
198    if ($thrift_client->exists($path))
199    {
200      $result = 1;
201    }
202  }
203  elsif ($test_op eq '-f')
204  {
205    if ($thrift_client->exists($path))
206    {
207      my $file = $thrift_client->stat($path);
208      if (!$file->{'isdir'})
209      {
210        $result = 1;
211      }
212    }
213  }
214  else
215  {
216    &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
217  }
218  return $result;
219}
220## fileTest()
221
222
223## @function filenameConcatenate()
224#
225sub filenameConcatenate
226{
227  my $protocol = shift(@_);
228  my $filename = join('/', @_);
229  # remove repeated slashes
230  $filename =~ s/[\/]+/\//g;
231  # append protocol (which may cause multiple slashes)
232  $filename = $protocol . '/' . $filename;
233  # strip any trailing slashes
234  $filename =~ s/[\\\/]$//;
235  return $filename;
236}
237## filenameConcatenate()
238
239
240## @function makeDirectory()
241#
242sub makeDirectory
243{
244  my ($raw_path) = @_;
245  my $result = 0;
246  # ensure we have a connection to the thrift server
247  &_establishClient();
248  # - convert the path into a proper thrift path object
249  my $path = &_generateHDFSPath($raw_path);
250  if (!&fileTest($path, '-d'))
251  {
252    # - create the directory
253    $thrift_client->mkdirs($path);
254  }
255  # - check that it exists
256  return (&fileTest($path, '-d'));
257}
258## makeDirectory()
259
260
261## @function modificationTime()
262#
263sub modificationTime
264{
265  my ($path) = @_;
266  # ensure we have a connection to the thrift server
267  &_establishClient();
268  # - convert the path into a proper thrift path object
269  $path = &_generateHDFSPath($path);
270  my $file_stat = $thrift_client->stat($path);
271  return floor($file_stat->{modificationTime} / 1000);
272}
273## modificationTime()
274
275
276## @function openFileHandle()
277#
278sub openFileHandle
279{
280  my ($raw_path, $mode, $fh_ref) = @_;
281  # ensure we have a connection to the thrift server
282  &_establishClient();
283  #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
284  my $path = &_generateHDFSPath($raw_path);
285  my $fh = gensym();
286  tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
287  open($fh, $path, $mode) or die("Failed to open thriftfs");
288  $$fh_ref = $fh;
289  return 1;
290}
291## openFileHandle()
292
293
294## @function readDirectory()
295#
296sub readDirectory
297{
298  my ($raw_path) = @_;
299  my @files;
300  # ensure we have a connection to the thrift server
301  &_establishClient();
302  my $path = &_generateHDFSPath($raw_path);
303  my $raw_files = $thrift_client->listStatus($path);
304  if ($raw_files && @{$raw_files} > 0)
305  {
306    foreach my $file_stat (@{$raw_files})
307    {
308      my $file_path = $file_stat->{'path'};
309      my ($filename) = $file_path =~ /([^\\\/]+)$/;
310      push(@files, $filename);
311    }
312  }
313  return \@files;
314}
315## readDirectory()
316
317
318## @function removeFiles()
319#
320sub removeFiles
321{
322  my ($path, $recursive) = @_;
323  my $result = 0;
324  if (!defined $recursive)
325  {
326    $recursive = 0;
327  }
328  # ensure we have a connection to the thrift server
329  &_establishClient();
330  # - convert the path into a proper thrift path object as necessary
331  $path = &_generateHDFSPath($path);
332  if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
333  {
334    $thrift_client->rm($path, $recursive);
335    $result = !$thrift_client->exists($path);
336  }
337  return $result;
338}
339## removeFiles()
340
341
342## @function removeFilesFiltered()
343#
344sub removeFilesFiltered
345{
346  my ($paths, $accept_re, $reject_re) = @_;
347  # ensure we have a connection to the thrift server
348  &_establishClient();
349  # Perform a depth first, recursive, removal of files and directories that
350  # match the given accept and reject patterns
351  my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
352  my $num_removed = 0;
353  foreach my $raw_path (@paths_array)
354  {
355    # remove trailing slashes
356    $raw_path =~ s/[\/\\]+$//;
357    my $path = &_generateHDFSPath($raw_path);
358    if (!$thrift_client->exists($path))
359    {
360      print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
361    }
362    elsif (&fileTest($path, '-d'))
363    {
364      my @files = @{&readDirectory($path)};
365      foreach my $file (@files)
366      {
367        my $child_path = $raw_path . '/' . $file;
368        $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
369      }
370      if (!defined $accept_re && !defined $reject_re)
371      {
372        # remove this directory - non-recursively so that the command fails
373        # if there are (somehow) still files contained within
374        $thrift_client->rm($path, 0);
375        if ($thrift_client->exists($path))
376        {
377          print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
378        }
379        else
380        {
381          $num_removed++;
382        }
383      }
384    }
385    else
386    {
387      if (defined $reject_re && ($raw_path =~ m/$reject_re/))
388      {
389        next;
390      }
391      if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
392      {
393        # remove this file
394        $thrift_client->rm($path, 0);
395        if ($thrift_client->exists($path))
396        {
397          print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
398        }
399        else
400        {
401          $num_removed++;
402        }
403      }
404    }
405  }
406  return $num_removed;
407}
408## removeFilesFiltered()
409
410
411## @function removeFilesRecursive()
412#
413sub removeFilesRecursive
414{
415  my ($path) = @_;
416  # use the more general removeFilesFiltered() function with no accept or reject
417  # expressions
418  return &removeFilesFiltered($path, undef, undef);
419}
420## removeFilesRecursive()
421
422
423## @function supportsSymbolicLink
424#
425sub supportsSymbolicLink
426{
427  return 0;
428}
429## supportsSymbolicLink()
430
431
432## @function transferFile()
433#
434sub transferFile
435{
436  my ($mode, $src, $dst) = @_;
437  # ensure we have a connection to the thrift server
438  &_establishClient();
439  #rint STDERR "transferFile($mode, $src, $dst)\n";
440  my $src_path = &_generateHDFSPath($src);
441  my $dst_path = &_generateHDFSPath($dst);
442  if (&fileTest($dst_path, '-d'))
443  {
444    my ($filename) = $src =~ /([^\\\/]+)$/;
445    $dst .= '/' . $filename;
446    $dst_path = &_generateHDFSPath($dst);
447  }
448  if (!$thrift_client->exists($src_path))
449  {
450    &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
451    return 0;
452  }
453  if ($thrift_client->exists($dst_path))
454  {
455    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
456    return 0;
457  }
458  # what happens next depends on the mode, and is either very easy or really
459  # hard
460  if ($mode eq 'MOVE')
461  {
462    $thrift_client->rename($src_path, $dst_path);
463  }
464  elsif ($mode eq 'COPY')
465  {
466    # Open the src file for reading
467    #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
468    my $fhin = $thrift_client->open($src_path);
469    # Create the dst file for writing
470    #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
471    my $fhout = $thrift_client->create($dst_path);
472    # Read all of src file writing to dst file
473    # - this is where things have the potential to go wrong, as it doesn't seem
474    #   thrift supports writing bytes
475    # - only strings. May need to see if I can make Perl behave using black
476    #   magic flags (marking string as binary etc) It'll work fine for text
477    #   files though
478    my $data = undef;
479    my $offset = 0;
480    my $length = 4096;
481    # Read 4K blocks at a time
482    while ($data = $thrift_client->read($fhin, $offset, $length))
483    {
484      $thrift_client->write($fhout, $data);
485      $offset += $length;
486      if (length ($data) < $length)
487      {
488        last;
489      }
490    }
491    # Close files
492    $thrift_client->close($fhout);
493    $thrift_client->close($fhin);
494  }
495  my $result = ($thrift_client->exists($dst_path));
496  #rint STDERR "transferFile() => $result\n";
497  return $result;
498}
499## transferFile()
500
501
502## @function transferFileFromLocal()
503#
504sub transferFileFromLocal
505{
506  my ($mode, $src, $dst) = @_;
507  # ensure we have a connection to the thrift server
508  &_establishClient();
509  # destination is remote
510  my $dst_path = &_generateHDFSPath($dst);
511  if (&fileTest($dst_path, '-d'))
512  {
513    my ($filename) = $src =~ /([^\\\/]+)$/;
514    $dst .= '/' . $filename;
515    $dst_path = &_generateHDFSPath($dst);
516  }
517  # can't replace - if the file already exists
518  if (&fileTest($dst_path, '-f'))
519  {
520    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
521    return 0;
522  }
523  # copy the file
524  my $fhin;
525  open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
526  my $decoded = '';
527  my $fhout = $thrift_client->create($dst_path);
528  while (read($fhin, $decoded, 4096))
529  {
530    if ($debug_encoding)
531    {
532      print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
533    }
534    # Base64 encode to protect binary
535    #my $encoded = encode_base64($decoded);
536    # Base91 encode to protect binary - we add a Byte Order Marker so the
537    # Thrift Server can detect the need to decode the string sent
538    my $encoded = MIME::Base91::encode($decoded);
539    if ($debug_encoding)
540    {
541      print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
542    }
543    $thrift_client->write($fhout, $encoded);
544  }
545  close($fhin);
546  $thrift_client->close($fhout);
547  # in general, the transfer has worked if the destination file exists
548  my $result = $thrift_client->exists($dst_path);
549  # if moving, remove the source file from the local filesystem
550  if ($mode eq 'MOVE')
551  {
552    unlink($src);
553    # update result to reflect if we successfully removed the src file
554    $result = $result && (!-f $src);
555  }
556  return $result
557}
558## transferFileFromLocal()
559
560
561## @function transferFileToLocal()
562#
563sub transferFileToLocal
564{
565  my ($mode, $src, $dst) = @_;
566  # ensure we have a connection to the thrift server
567  &_establishClient();
568  # source is remote
569  my $src_path = &_generateHDFSPath($src);
570  if (!$thrift_client->exists($src_path))
571  {
572    &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
573    return 0;
574  }
575  if (-d $dst)
576  {
577    my ($filename) = $src =~ /([^\\\/]+)$/;
578    $dst .= '/' . $filename;
579  }
580  if (-e $dst)
581  {
582    &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
583    return 0;
584  }
585  # open local file
586  my $fhout;
587  my $encoded = undef;
588  my $offset = 0;
589  my $length = 4096; # Read 4K blocks at a time
590  open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
591  my $fhin = $thrift_client->open($src_path);
592  while ($encoded = $thrift_client->read($fhin, $offset, $length))
593  {
594    if ($debug_encoding)
595    {
596      print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
597    }
598    #my $decoded = decode_base64($encoded);
599    my $decoded = MIME::Base91::decode($encoded);
600    if ($debug_encoding)
601    {
602      print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
603    }
604    print $fhout $decoded;
605    last if (length ($encoded) < $length);
606    $offset += $length;
607  }
608  close($fhout);
609  $thrift_client->close($fhin);
610  # in general, the transfer has worked if the destination file exists
611  my $result = (-f $dst);
612  # if moving, remove the source file from the HDFS filesystem
613  if ($mode eq 'MOVE')
614  {
615    $thrift_client->rm($src_path, 0);
616    # update result to reflect if we successfully removed the src file
617    $result = $result && !$thrift_client->exists($src_path);
618  }
619  return $result;
620}
621## transferFileToLocal()
622
623
6241;
Note: See TracBrowser for help on using the browser.