source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27478

Last change on this file since 27478 was 27478, checked in by jmt12, 11 years ago

Be a bit smarter about locating Perl version if not provided (rather than just dying)

File size: 19.2 KB
RevLine 
[27386]1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
[27478]47 # We need the Perl version before continuing
48 if (!defined $ENV{'PERL_VERSION'})
49 {
50 $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`;
51 }
[27386]52 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
53 # Bit::Vector and Thrift modules
54 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
55 # ThriftFS Perl API
[27478]56 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl');
[27386]57}
58
59# Modules - Core
60use Devel::Peek;
61use MIME::Base64;
62use POSIX qw(floor);
63use Symbol;
64use Thrift::Socket;
65use Thrift::BufferedTransport;
66use Thrift::BinaryProtocol;
67# Modules - Thrift
68use HadoopFS::FileSystem;
69# Modules - Greenstone
70use FileUtils::HDThriftFS::ThriftFH;
71use MIME::Base91;
72
73# Configuration
74my $host = "localhost";
75my $port = 58660;
[27424]76my $debug = 0;
[27386]77my $debug_encoding = 0;
78
79# Globals
80my $transport;
81my $thrift_client;
82
83
84## @function END()
85#
86# Ensure the transport layer, if open, is properly closed
87#
88END
89{
90 if (defined $transport)
91 {
92 $transport->close();
93 }
94}
95## END()
96
97
98## @function _establishClient()
99#
100sub _establishClient
101{
102 if (!defined $thrift_client)
103 {
104 ###rint "Create Thrift Client to $host:$port\n";
105 my $socket = Thrift::Socket->new($host, $port);
106 $socket->setSendTimeout(10000);
107 $socket->setRecvTimeout(20000);
108
109 $transport = Thrift::BufferedTransport->new($socket);
110 my $protocol = Thrift::BinaryProtocol->new($transport);
111 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
112
113 eval { $transport->open(); };
114 if ($@)
115 {
[27424]116 &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
[27386]117 }
118 }
119}
120## _establishClient()
121
122
123## @function _generateHDFSPath()
124#
125sub _generateHDFSPath
126{
127 my ($path) = @_;
128 if (ref($path) ne 'HadoopFS::Pathname')
129 {
130 if ($path !~ /HDThriftFS:\/\//)
131 {
132 &_printError('Not a valid thrift URI: ' . $path);
133 }
134 else
135 {
136 $path = HadoopFS::Pathname->new( { pathname => substr($path, 13) } );
137 }
138 }
139 return $path;
140}
141## _generateHDFSPath()
142
143
[27424]144## @function _printDebug()
145#
146sub _printDebug
147{
148 my ($msg) = @_;
149 if ($debug)
150 {
151 my ($package, $filename, $line, $function) = caller(1);
152 print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
153 }
154}
155## _printDebug()
156
[27386]157################################################################################
158################################################################################
159################################################################################
160
161
[27424]162## @function canRead()
163#
164sub canRead
165{
166 my $path = shift(@_);
167 return &checkPermission($path, 'r', @_);
168}
169## canRead()
170
171
172## @function checkPermission()
173#
174sub checkPermission
175{
176 my ($path, $mode, $username, $usergroup) = @_;
177 my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
178 # - ensure we have a connection to the thrift server
179 &_establishClient();
180 # - convert the path into a proper thrift path object
181 $path = &_generateHDFSPath($path);
182 # - determine the user (defaults to current user)
183 if (!defined $username)
184 {
185 if ($ENV{'GSDLOS'} =~ /^windows$/i)
186 {
187 require Win32;
188 $username = Win32::LoginName();
189 }
190 else
191 {
192 $username = getlogin || getpwuid($<);
193 }
194 }
195 # - determine the group
196 my $usergroups = {};
197 if (defined $usergroup)
198 {
199 $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
200 }
201 else
202 {
203 if ($ENV{'GSDLOS'} =~ /^windows$/i)
204 {
205 # dunno
206 }
207 else
208 {
209 my $raw_groups = `groups`;
210 foreach my $group ( split(/\s/, $raw_groups))
211 {
212 $usergroups->{$group} = 1;
213 }
214 }
215 }
216 # Retrieve details from the file
217 my $file_stat = $thrift_client->stat($path);
218 my $owner = $file_stat->{'owner'};
219 my $group = $file_stat->{'group'};
220 my $permissions = $file_stat->{'permission'};
221 # Begin the cascade of tests to determine if the identified user belonging to
222 # the identified group can perform 'mode' access to the file.
223 my $has_permission = 0;
224 # - start with [u]ser permission
225 if (defined $owner && $username eq $owner)
226 {
227 my $target_char = substr($permissions, $offsets->{$mode}, 1);
228 if ($mode eq $target_char)
229 {
230 $has_permission = 1;
231 }
232 }
233 # - failing that, try [g]roup level permissions
234 if (!$has_permission && defined $group && defined $usergroups->{$group})
235 {
236 my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
237 if ($mode eq $target_char)
238 {
239 $has_permission = 1;
240 }
241 }
242 # - and finally try [o]ther level permission
243 if (!$has_permission)
244 {
245 my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
246 if ($mode eq $target_char)
247 {
248 $has_permission = 1;
249 }
250 }
251 return $has_permission;
252}
253## checkPermission
254
255
[27386]256## @function closeFileHandle()
257#
258sub closeFileHandle
259{
260 my $fh_ref = shift(@_);
[27424]261 &_printDebug('');
[27386]262 close($$fh_ref);
263 untie($$fh_ref);
264 return 1;
265}
266## closeFileHandle()
267
268
269## @function fileSize()
270#
271sub fileSize
272{
273 my ($path, $test_op) = @_;
274 # ensure we have a connection to the thrift server
275 &_establishClient();
276 # - convert the path into a proper thrift path object
277 $path = &_generateHDFSPath($path);
278 my $file_stat = $thrift_client->stat($path);
279 return $file_stat->{length};
280}
281## fileSize()
282
283
284## @function fileTest()
285#
286sub fileTest
287{
288 my ($raw_path, $test_op) = @_;
289 my $result = 0;
290 # ensure we have a connection to the thrift server
291 &_establishClient();
292 # - convert the path into a proper thrift path object
293 my $path = &_generateHDFSPath($raw_path);
294 # note: symbolic linking not supported within HDFS
295 if (!defined $test_op || $test_op eq '-l')
296 {
297 $test_op = '-e';
298 }
299 if ($test_op eq '-d')
300 {
301 if ($thrift_client->exists($path))
302 {
303 my $file = $thrift_client->stat($path);
304 if ($file->{'isdir'})
305 {
306 $result = 1;
307 }
308 }
309 }
310 elsif ($test_op eq '-e')
311 {
312 if ($thrift_client->exists($path))
313 {
314 $result = 1;
315 }
316 }
317 elsif ($test_op eq '-f')
318 {
319 if ($thrift_client->exists($path))
320 {
321 my $file = $thrift_client->stat($path);
322 if (!$file->{'isdir'})
323 {
324 $result = 1;
325 }
326 }
327 }
328 else
329 {
330 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
331 }
332 return $result;
333}
334## fileTest()
335
336
337## @function filenameConcatenate()
338#
339sub filenameConcatenate
340{
341 my $protocol = shift(@_);
342 my $filename = join('/', @_);
343 # remove repeated slashes
344 $filename =~ s/[\/]+/\//g;
345 # append protocol (which may cause multiple slashes)
346 $filename = $protocol . '/' . $filename;
347 # strip any trailing slashes
348 $filename =~ s/[\\\/]$//;
349 return $filename;
350}
351## filenameConcatenate()
352
353
[27424]354## @function isFilenameAbsolute()
355#
356sub isFilenameAbsolute
357{
358 # File paths against HDFS must be.
359 return 1;
360}
361# isFilenameAbsolute()
362
363
[27386]364## @function makeDirectory()
365#
366sub makeDirectory
367{
368 my ($raw_path) = @_;
369 my $result = 0;
370 # ensure we have a connection to the thrift server
371 &_establishClient();
372 # - convert the path into a proper thrift path object
373 my $path = &_generateHDFSPath($raw_path);
374 if (!&fileTest($path, '-d'))
375 {
376 # - create the directory
377 $thrift_client->mkdirs($path);
378 }
379 # - check that it exists
380 return (&fileTest($path, '-d'));
381}
382## makeDirectory()
383
384
385## @function modificationTime()
386#
387sub modificationTime
388{
389 my ($path) = @_;
390 # ensure we have a connection to the thrift server
391 &_establishClient();
392 # - convert the path into a proper thrift path object
393 $path = &_generateHDFSPath($path);
394 my $file_stat = $thrift_client->stat($path);
395 return floor($file_stat->{modificationTime} / 1000);
396}
397## modificationTime()
398
399
400## @function openFileHandle()
401#
402sub openFileHandle
403{
404 my ($raw_path, $mode, $fh_ref) = @_;
[27424]405 &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
[27386]406 # ensure we have a connection to the thrift server
407 &_establishClient();
408 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
409 my $path = &_generateHDFSPath($raw_path);
410 my $fh = gensym();
411 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
412 open($fh, $path, $mode) or die("Failed to open thriftfs");
413 $$fh_ref = $fh;
414 return 1;
415}
416## openFileHandle()
417
418
419## @function readDirectory()
420#
421sub readDirectory
422{
423 my ($raw_path) = @_;
424 my @files;
425 # ensure we have a connection to the thrift server
426 &_establishClient();
427 my $path = &_generateHDFSPath($raw_path);
428 my $raw_files = $thrift_client->listStatus($path);
429 if ($raw_files && @{$raw_files} > 0)
430 {
431 foreach my $file_stat (@{$raw_files})
432 {
433 my $file_path = $file_stat->{'path'};
434 my ($filename) = $file_path =~ /([^\\\/]+)$/;
435 push(@files, $filename);
436 }
437 }
438 return \@files;
439}
440## readDirectory()
441
442
443## @function removeFiles()
444#
445sub removeFiles
446{
447 my ($path, $recursive) = @_;
448 my $result = 0;
449 if (!defined $recursive)
450 {
451 $recursive = 0;
452 }
453 # ensure we have a connection to the thrift server
454 &_establishClient();
455 # - convert the path into a proper thrift path object as necessary
456 $path = &_generateHDFSPath($path);
457 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
458 {
459 $thrift_client->rm($path, $recursive);
460 $result = !$thrift_client->exists($path);
461 }
462 return $result;
463}
464## removeFiles()
465
466
467## @function removeFilesFiltered()
468#
469sub removeFilesFiltered
470{
471 my ($paths, $accept_re, $reject_re) = @_;
472 # ensure we have a connection to the thrift server
473 &_establishClient();
474 # Perform a depth first, recursive, removal of files and directories that
475 # match the given accept and reject patterns
476 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
477 my $num_removed = 0;
478 foreach my $raw_path (@paths_array)
479 {
480 # remove trailing slashes
481 $raw_path =~ s/[\/\\]+$//;
482 my $path = &_generateHDFSPath($raw_path);
483 if (!$thrift_client->exists($path))
484 {
485 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
486 }
487 elsif (&fileTest($path, '-d'))
488 {
489 my @files = @{&readDirectory($path)};
490 foreach my $file (@files)
491 {
492 my $child_path = $raw_path . '/' . $file;
493 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
494 }
495 if (!defined $accept_re && !defined $reject_re)
496 {
497 # remove this directory - non-recursively so that the command fails
498 # if there are (somehow) still files contained within
499 $thrift_client->rm($path, 0);
500 if ($thrift_client->exists($path))
501 {
502 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
503 }
504 else
505 {
506 $num_removed++;
507 }
508 }
509 }
510 else
511 {
512 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
513 {
514 next;
515 }
516 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
517 {
518 # remove this file
519 $thrift_client->rm($path, 0);
520 if ($thrift_client->exists($path))
521 {
522 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
523 }
524 else
525 {
526 $num_removed++;
527 }
528 }
529 }
530 }
531 return $num_removed;
532}
533## removeFilesFiltered()
534
535
536## @function removeFilesRecursive()
537#
538sub removeFilesRecursive
539{
540 my ($path) = @_;
541 # use the more general removeFilesFiltered() function with no accept or reject
542 # expressions
543 return &removeFilesFiltered($path, undef, undef);
544}
545## removeFilesRecursive()
546
547
548## @function supportsSymbolicLink
549#
550sub supportsSymbolicLink
551{
552 return 0;
553}
554## supportsSymbolicLink()
555
556
557## @function transferFile()
558#
559sub transferFile
560{
561 my ($mode, $src, $dst) = @_;
562 # ensure we have a connection to the thrift server
563 &_establishClient();
564 #rint STDERR "transferFile($mode, $src, $dst)\n";
565 my $src_path = &_generateHDFSPath($src);
566 my $dst_path = &_generateHDFSPath($dst);
567 if (&fileTest($dst_path, '-d'))
568 {
569 my ($filename) = $src =~ /([^\\\/]+)$/;
570 $dst .= '/' . $filename;
571 $dst_path = &_generateHDFSPath($dst);
572 }
573 if (!$thrift_client->exists($src_path))
574 {
575 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
576 return 0;
577 }
578 if ($thrift_client->exists($dst_path))
579 {
580 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
581 return 0;
582 }
583 # what happens next depends on the mode, and is either very easy or really
584 # hard
585 if ($mode eq 'MOVE')
586 {
587 $thrift_client->rename($src_path, $dst_path);
588 }
589 elsif ($mode eq 'COPY')
590 {
591 # Open the src file for reading
592 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
593 my $fhin = $thrift_client->open($src_path);
594 # Create the dst file for writing
595 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
596 my $fhout = $thrift_client->create($dst_path);
597 # Read all of src file writing to dst file
598 # - this is where things have the potential to go wrong, as it doesn't seem
599 # thrift supports writing bytes
600 # - only strings. May need to see if I can make Perl behave using black
601 # magic flags (marking string as binary etc) It'll work fine for text
602 # files though
603 my $data = undef;
604 my $offset = 0;
605 my $length = 4096;
606 # Read 4K blocks at a time
607 while ($data = $thrift_client->read($fhin, $offset, $length))
608 {
609 $thrift_client->write($fhout, $data);
610 $offset += $length;
611 if (length ($data) < $length)
612 {
613 last;
614 }
615 }
616 # Close files
617 $thrift_client->close($fhout);
618 $thrift_client->close($fhin);
619 }
620 my $result = ($thrift_client->exists($dst_path));
621 #rint STDERR "transferFile() => $result\n";
622 return $result;
623}
624## transferFile()
625
626
627## @function transferFileFromLocal()
628#
629sub transferFileFromLocal
630{
631 my ($mode, $src, $dst) = @_;
632 # ensure we have a connection to the thrift server
633 &_establishClient();
634 # destination is remote
635 my $dst_path = &_generateHDFSPath($dst);
636 if (&fileTest($dst_path, '-d'))
637 {
638 my ($filename) = $src =~ /([^\\\/]+)$/;
639 $dst .= '/' . $filename;
640 $dst_path = &_generateHDFSPath($dst);
641 }
642 # can't replace - if the file already exists
643 if (&fileTest($dst_path, '-f'))
644 {
645 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
646 return 0;
647 }
648 # copy the file
649 my $fhin;
650 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
651 my $decoded = '';
652 my $fhout = $thrift_client->create($dst_path);
653 while (read($fhin, $decoded, 4096))
654 {
655 if ($debug_encoding)
656 {
657 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
658 }
659 # Base64 encode to protect binary
660 #my $encoded = encode_base64($decoded);
661 # Base91 encode to protect binary - we add a Byte Order Marker so the
662 # Thrift Server can detect the need to decode the string sent
663 my $encoded = MIME::Base91::encode($decoded);
664 if ($debug_encoding)
665 {
666 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
667 }
668 $thrift_client->write($fhout, $encoded);
669 }
670 close($fhin);
671 $thrift_client->close($fhout);
672 # in general, the transfer has worked if the destination file exists
673 my $result = $thrift_client->exists($dst_path);
674 # if moving, remove the source file from the local filesystem
675 if ($mode eq 'MOVE')
676 {
677 unlink($src);
678 # update result to reflect if we successfully removed the src file
679 $result = $result && (!-f $src);
680 }
681 return $result
682}
683## transferFileFromLocal()
684
685
686## @function transferFileToLocal()
687#
688sub transferFileToLocal
689{
690 my ($mode, $src, $dst) = @_;
691 # ensure we have a connection to the thrift server
692 &_establishClient();
693 # source is remote
694 my $src_path = &_generateHDFSPath($src);
695 if (!$thrift_client->exists($src_path))
696 {
697 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
698 return 0;
699 }
700 if (-d $dst)
701 {
702 my ($filename) = $src =~ /([^\\\/]+)$/;
703 $dst .= '/' . $filename;
704 }
705 if (-e $dst)
706 {
707 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
708 return 0;
709 }
710 # open local file
711 my $fhout;
712 my $encoded = undef;
713 my $offset = 0;
714 my $length = 4096; # Read 4K blocks at a time
715 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
716 my $fhin = $thrift_client->open($src_path);
717 while ($encoded = $thrift_client->read($fhin, $offset, $length))
718 {
719 if ($debug_encoding)
720 {
721 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
722 }
723 #my $decoded = decode_base64($encoded);
724 my $decoded = MIME::Base91::decode($encoded);
725 if ($debug_encoding)
726 {
727 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
728 }
729 print $fhout $decoded;
730 last if (length ($encoded) < $length);
731 $offset += $length;
732 }
733 close($fhout);
734 $thrift_client->close($fhin);
735 # in general, the transfer has worked if the destination file exists
736 my $result = (-f $dst);
737 # if moving, remove the source file from the HDFS filesystem
738 if ($mode eq 'MOVE')
739 {
740 $thrift_client->rm($src_path, 0);
741 # update result to reflect if we successfully removed the src file
742 $result = $result && !$thrift_client->exists($src_path);
743 }
744 return $result;
745}
746## transferFileToLocal()
747
748
7491;
Note: See TracBrowser for help on using the repository browser.