source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27481

Last change on this file since 27481 was 27481, checked in by jmt12, 11 years ago

Adding makeAllDirectories() (which I'd only implemented in LocalFS) to FileUtils (which in turn calls the Driver specific makeDirectory() recursively) and added test for this function

File size: 19.2 KB
Line 
1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47 # We need the Perl version before continuing
48 if (!defined $ENV{'PERL_VERSION'})
49 {
50 $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`;
51 }
52 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
53 # Bit::Vector and Thrift modules
54 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
55 # ThriftFS Perl API
56 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl');
57}
58
59# Modules - Core
60use Devel::Peek;
61use MIME::Base64;
62use POSIX qw(floor);
63use Symbol;
64use Thrift::Socket;
65use Thrift::BufferedTransport;
66use Thrift::BinaryProtocol;
67# Modules - Thrift
68use HadoopFS::FileSystem;
69# Modules - Greenstone
70use FileUtils::HDThriftFS::ThriftFH;
71use MIME::Base91;
72
73# Configuration
74my $host = "localhost";
75my $port = 58660;
76my $debug = 0;
77my $debug_encoding = 0;
78
79# Globals
80my $transport;
81my $thrift_client;
82
83
84## @function END()
85#
86# Ensure the transport layer, if open, is properly closed
87#
88END
89{
90 if (defined $transport)
91 {
92 $transport->close();
93 }
94}
95## END()
96
97
98## @function _establishClient()
99#
100sub _establishClient
101{
102 if (!defined $thrift_client)
103 {
104 ###rint "Create Thrift Client to $host:$port\n";
105 my $socket = Thrift::Socket->new($host, $port);
106 $socket->setSendTimeout(10000);
107 $socket->setRecvTimeout(20000);
108
109 $transport = Thrift::BufferedTransport->new($socket);
110 my $protocol = Thrift::BinaryProtocol->new($transport);
111 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
112
113 eval { $transport->open(); };
114 if ($@)
115 {
116 &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
117 }
118 }
119}
120## _establishClient()
121
122
123## @function _generateHDFSPath()
124#
125sub _generateHDFSPath
126{
127 my ($path) = @_;
128 if (ref($path) ne 'HadoopFS::Pathname')
129 {
130 if ($path !~ /HDThriftFS:\/\//)
131 {
132 &FileUtils::printError('Not a valid thrift URI: ' . $path);
133 }
134 else
135 {
136 $path = HadoopFS::Pathname->new( { pathname => substr($path, 13) } );
137 }
138 }
139 return $path;
140}
141## _generateHDFSPath()
142
143
144## @function _printDebug()
145#
146sub _printDebug
147{
148 my ($msg) = @_;
149 if ($debug)
150 {
151 my ($package, $filename, $line, $function) = caller(1);
152 print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
153 }
154}
155## _printDebug()
156
157################################################################################
158################################################################################
159################################################################################
160
161
162## @function canRead()
163#
164sub canRead
165{
166 my $path = shift(@_);
167 return &checkPermission($path, 'r', @_);
168}
169## canRead()
170
171
172## @function checkPermission()
173#
174sub checkPermission
175{
176 my ($path, $mode, $username, $usergroup) = @_;
177 my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
178 # - ensure we have a connection to the thrift server
179 &_establishClient();
180 # - convert the path into a proper thrift path object
181 $path = &_generateHDFSPath($path);
182 # - determine the user (defaults to current user)
183 if (!defined $username)
184 {
185 if ($ENV{'GSDLOS'} =~ /^windows$/i)
186 {
187 require Win32;
188 $username = Win32::LoginName();
189 }
190 else
191 {
192 $username = getlogin || getpwuid($<);
193 }
194 }
195 # - determine the group
196 my $usergroups = {};
197 if (defined $usergroup)
198 {
199 $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
200 }
201 else
202 {
203 if ($ENV{'GSDLOS'} =~ /^windows$/i)
204 {
205 # dunno
206 }
207 else
208 {
209 my $raw_groups = `groups`;
210 foreach my $group ( split(/\s/, $raw_groups))
211 {
212 $usergroups->{$group} = 1;
213 }
214 }
215 }
216 # Retrieve details from the file
217 my $file_stat = $thrift_client->stat($path);
218 my $owner = $file_stat->{'owner'};
219 my $group = $file_stat->{'group'};
220 my $permissions = $file_stat->{'permission'};
221 # Begin the cascade of tests to determine if the identified user belonging to
222 # the identified group can perform 'mode' access to the file.
223 my $has_permission = 0;
224 # - start with [u]ser permission
225 if (defined $owner && $username eq $owner)
226 {
227 my $target_char = substr($permissions, $offsets->{$mode}, 1);
228 if ($mode eq $target_char)
229 {
230 $has_permission = 1;
231 }
232 }
233 # - failing that, try [g]roup level permissions
234 if (!$has_permission && defined $group && defined $usergroups->{$group})
235 {
236 my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
237 if ($mode eq $target_char)
238 {
239 $has_permission = 1;
240 }
241 }
242 # - and finally try [o]ther level permission
243 if (!$has_permission)
244 {
245 my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
246 if ($mode eq $target_char)
247 {
248 $has_permission = 1;
249 }
250 }
251 return $has_permission;
252}
253## checkPermission
254
255
256## @function closeFileHandle()
257#
258sub closeFileHandle
259{
260 my $fh_ref = shift(@_);
261 &_printDebug('');
262 close($$fh_ref);
263 untie($$fh_ref);
264 return 1;
265}
266## closeFileHandle()
267
268
269## @function fileSize()
270#
271sub fileSize
272{
273 my ($path, $test_op) = @_;
274 # ensure we have a connection to the thrift server
275 &_establishClient();
276 # - convert the path into a proper thrift path object
277 $path = &_generateHDFSPath($path);
278 my $file_stat = $thrift_client->stat($path);
279 return $file_stat->{length};
280}
281## fileSize()
282
283
284## @function fileTest()
285#
286sub fileTest
287{
288 my ($raw_path, $test_op) = @_;
289 my $result = 0;
290 # ensure we have a connection to the thrift server
291 &_establishClient();
292 # - convert the path into a proper thrift path object
293 my $path = &_generateHDFSPath($raw_path);
294 # note: symbolic linking not supported within HDFS
295 if (!defined $test_op || $test_op eq '-l')
296 {
297 $test_op = '-e';
298 }
299 if ($test_op eq '-d')
300 {
301 if ($thrift_client->exists($path))
302 {
303 my $file = $thrift_client->stat($path);
304 if ($file->{'isdir'})
305 {
306 $result = 1;
307 }
308 }
309 }
310 elsif ($test_op eq '-e')
311 {
312 if ($thrift_client->exists($path))
313 {
314 $result = 1;
315 }
316 }
317 elsif ($test_op eq '-f')
318 {
319 if ($thrift_client->exists($path))
320 {
321 my $file = $thrift_client->stat($path);
322 if (!$file->{'isdir'})
323 {
324 $result = 1;
325 }
326 }
327 }
328 else
329 {
330 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
331 }
332 return $result;
333}
334## fileTest()
335
336
337## @function filenameConcatenate()
338#
339sub filenameConcatenate
340{
341 my $protocol = shift(@_);
342 my $filename = join('/', @_);
343 # remove repeated slashes
344 $filename =~ s/[\/]+/\//g;
345 # append protocol (which may cause multiple slashes)
346 $filename = $protocol . '/' . $filename;
347 # strip any trailing slashes
348 $filename =~ s/[\\\/]$//;
349 return $filename;
350}
351## filenameConcatenate()
352
353
354## @function isFilenameAbsolute()
355#
356sub isFilenameAbsolute
357{
358 # File paths against HDFS must be.
359 return 1;
360}
361# isFilenameAbsolute()
362
363
364## @function makeDirectory()
365#
366sub makeDirectory
367{
368 my ($raw_path) = @_;
369 my $result = 0;
370 # ensure we have a connection to the thrift server
371 &_establishClient();
372 # - convert the path into a proper thrift path object
373 my $path = &_generateHDFSPath($raw_path);
374 if (!&fileTest($path, '-d'))
375 {
376 # - create the directory
377 $thrift_client->mkdirs($path);
378 }
379 # - check that it exists
380 return (&fileTest($path, '-d'));
381}
382## makeDirectory()
383
384
385## @function modificationTime()
386#
387sub modificationTime
388{
389 my ($path) = @_;
390 # ensure we have a connection to the thrift server
391 &_establishClient();
392 # - convert the path into a proper thrift path object
393 $path = &_generateHDFSPath($path);
394 my $file_stat = $thrift_client->stat($path);
395 return floor($file_stat->{modificationTime} / 1000);
396}
397## modificationTime()
398
399
400## @function openFileHandle()
401#
402sub openFileHandle
403{
404 my ($raw_path, $mode, $fh_ref) = @_;
405 &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
406 # ensure we have a connection to the thrift server
407 &_establishClient();
408 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
409 my $path = &_generateHDFSPath($raw_path);
410 my $fh = gensym();
411 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
412 open($fh, $path, $mode) or die("Failed to open thriftfs");
413 $$fh_ref = $fh;
414 return 1;
415}
416## openFileHandle()
417
418
419## @function readDirectory()
420#
421sub readDirectory
422{
423 my ($raw_path) = @_;
424 my @files;
425 # ensure we have a connection to the thrift server
426 &_establishClient();
427 my $path = &_generateHDFSPath($raw_path);
428 my $raw_files = $thrift_client->listStatus($path);
429 if ($raw_files && @{$raw_files} > 0)
430 {
431 foreach my $file_stat (@{$raw_files})
432 {
433 my $file_path = $file_stat->{'path'};
434 my ($filename) = $file_path =~ /([^\\\/]+)$/;
435 push(@files, $filename);
436 }
437 }
438 return \@files;
439}
440## readDirectory()
441
442
443## @function removeFiles()
444#
445sub removeFiles
446{
447 my ($path, $recursive) = @_;
448 my $result = 0;
449 if (!defined $recursive)
450 {
451 $recursive = 0;
452 }
453 # ensure we have a connection to the thrift server
454 &_establishClient();
455 # - convert the path into a proper thrift path object as necessary
456 $path = &_generateHDFSPath($path);
457 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
458 {
459 $thrift_client->rm($path, $recursive);
460 $result = !$thrift_client->exists($path);
461 }
462 return $result;
463}
464## removeFiles()
465
466
467## @function removeFilesFiltered()
468#
469sub removeFilesFiltered
470{
471 my ($paths, $accept_re, $reject_re) = @_;
472 # ensure we have a connection to the thrift server
473 &_establishClient();
474 # Perform a depth first, recursive, removal of files and directories that
475 # match the given accept and reject patterns
476 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
477 my $num_removed = 0;
478 foreach my $raw_path (@paths_array)
479 {
480 # remove trailing slashes
481 $raw_path =~ s/[\/\\]+$//;
482 my $path = &_generateHDFSPath($raw_path);
483 if (!$thrift_client->exists($path))
484 {
485 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
486 }
487 elsif (&fileTest($path, '-d'))
488 {
489 my @files = @{&readDirectory($path)};
490 foreach my $file (@files)
491 {
492 my $child_path = $raw_path . '/' . $file;
493 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
494 }
495 if (!defined $accept_re && !defined $reject_re)
496 {
497 # remove this directory - non-recursively so that the command fails
498 # if there are (somehow) still files contained within
499 $thrift_client->rm($path, 0);
500 if ($thrift_client->exists($path))
501 {
502 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
503 }
504 else
505 {
506 $num_removed++;
507 }
508 }
509 }
510 else
511 {
512 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
513 {
514 next;
515 }
516 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
517 {
518 # remove this file
519 $thrift_client->rm($path, 0);
520 if ($thrift_client->exists($path))
521 {
522 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
523 }
524 else
525 {
526 $num_removed++;
527 }
528 }
529 }
530 }
531 return $num_removed;
532}
533## removeFilesFiltered()
534
535
536## @function removeFilesRecursive()
537#
538sub removeFilesRecursive
539{
540 my ($path) = @_;
541 # use the more general removeFilesFiltered() function with no accept or reject
542 # expressions
543 return &removeFilesFiltered($path, undef, undef);
544}
545## removeFilesRecursive()
546
547
548## @function supportsSymbolicLink
549#
550sub supportsSymbolicLink
551{
552 return 0;
553}
554## supportsSymbolicLink()
555
556
557## @function transferFile()
558#
559sub transferFile
560{
561 my ($mode, $src, $dst) = @_;
562 # ensure we have a connection to the thrift server
563 &_establishClient();
564 #rint STDERR "transferFile($mode, $src, $dst)\n";
565 my $src_path = &_generateHDFSPath($src);
566 my $dst_path = &_generateHDFSPath($dst);
567 if (&fileTest($dst_path, '-d'))
568 {
569 my ($filename) = $src =~ /([^\\\/]+)$/;
570 $dst .= '/' . $filename;
571 $dst_path = &_generateHDFSPath($dst);
572 }
573 if (!$thrift_client->exists($src_path))
574 {
575 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
576 return 0;
577 }
578 if ($thrift_client->exists($dst_path))
579 {
580 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
581 return 0;
582 }
583 # what happens next depends on the mode, and is either very easy or really
584 # hard
585 if ($mode eq 'MOVE')
586 {
587 $thrift_client->rename($src_path, $dst_path);
588 }
589 elsif ($mode eq 'COPY')
590 {
591 # Open the src file for reading
592 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
593 my $fhin = $thrift_client->open($src_path);
594 # Create the dst file for writing
595 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
596 my $fhout = $thrift_client->create($dst_path);
597 # Read all of src file writing to dst file
598 # - this is where things have the potential to go wrong, as it doesn't seem
599 # thrift supports writing bytes
600 # - only strings. May need to see if I can make Perl behave using black
601 # magic flags (marking string as binary etc) It'll work fine for text
602 # files though
603 my $data = undef;
604 my $offset = 0;
605 my $length = 4096;
606 # Read 4K blocks at a time
607 while ($data = $thrift_client->read($fhin, $offset, $length))
608 {
609 $thrift_client->write($fhout, $data);
610 $offset += $length;
611 if (length ($data) < $length)
612 {
613 last;
614 }
615 }
616 # Close files
617 $thrift_client->close($fhout);
618 $thrift_client->close($fhin);
619 }
620 my $result = ($thrift_client->exists($dst_path));
621 #rint STDERR "transferFile() => $result\n";
622 return $result;
623}
624## transferFile()
625
626
627## @function transferFileFromLocal()
628#
629sub transferFileFromLocal
630{
631 my ($mode, $src, $dst) = @_;
632 # ensure we have a connection to the thrift server
633 &_establishClient();
634 # destination is remote
635 my $dst_path = &_generateHDFSPath($dst);
636 if (&fileTest($dst_path, '-d'))
637 {
638 my ($filename) = $src =~ /([^\\\/]+)$/;
639 $dst .= '/' . $filename;
640 $dst_path = &_generateHDFSPath($dst);
641 }
642 # can't replace - if the file already exists
643 if (&fileTest($dst_path, '-f'))
644 {
645 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
646 return 0;
647 }
648 # copy the file
649 my $fhin;
650 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
651 my $decoded = '';
652 my $fhout = $thrift_client->create($dst_path);
653 while (read($fhin, $decoded, 4096))
654 {
655 if ($debug_encoding)
656 {
657 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
658 }
659 # Base64 encode to protect binary
660 #my $encoded = encode_base64($decoded);
661 # Base91 encode to protect binary - we add a Byte Order Marker so the
662 # Thrift Server can detect the need to decode the string sent
663 my $encoded = MIME::Base91::encode($decoded);
664 if ($debug_encoding)
665 {
666 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
667 }
668 $thrift_client->write($fhout, $encoded);
669 }
670 close($fhin);
671 $thrift_client->close($fhout);
672 # in general, the transfer has worked if the destination file exists
673 my $result = $thrift_client->exists($dst_path);
674 # if moving, remove the source file from the local filesystem
675 if ($mode eq 'MOVE')
676 {
677 unlink($src);
678 # update result to reflect if we successfully removed the src file
679 $result = $result && (!-f $src);
680 }
681 return $result
682}
683## transferFileFromLocal()
684
685
686## @function transferFileToLocal()
687#
688sub transferFileToLocal
689{
690 my ($mode, $src, $dst) = @_;
691 # ensure we have a connection to the thrift server
692 &_establishClient();
693 # source is remote
694 my $src_path = &_generateHDFSPath($src);
695 if (!$thrift_client->exists($src_path))
696 {
697 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
698 return 0;
699 }
700 if (-d $dst)
701 {
702 my ($filename) = $src =~ /([^\\\/]+)$/;
703 $dst .= '/' . $filename;
704 }
705 if (-e $dst)
706 {
707 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
708 return 0;
709 }
710 # open local file
711 my $fhout;
712 my $encoded = undef;
713 my $offset = 0;
714 my $length = 4096; # Read 4K blocks at a time
715 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
716 my $fhin = $thrift_client->open($src_path);
717 while ($encoded = $thrift_client->read($fhin, $offset, $length))
718 {
719 if ($debug_encoding)
720 {
721 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
722 }
723 #my $decoded = decode_base64($encoded);
724 my $decoded = MIME::Base91::decode($encoded);
725 if ($debug_encoding)
726 {
727 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
728 }
729 print $fhout $decoded;
730 last if (length ($encoded) < $length);
731 $offset += $length;
732 }
733 close($fhout);
734 $thrift_client->close($fhin);
735 # in general, the transfer has worked if the destination file exists
736 my $result = (-f $dst);
737 # if moving, remove the source file from the HDFS filesystem
738 if ($mode eq 'MOVE')
739 {
740 $thrift_client->rm($src_path, 0);
741 # update result to reflect if we successfully removed the src file
742 $result = $result && !$thrift_client->exists($src_path);
743 }
744 return $result;
745}
746## transferFileToLocal()
747
748
7491;
Note: See TracBrowser for help on using the repository browser.