source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27492

Last change on this file since 27492 was 27492, checked in by jmt12, 11 years ago

Some versions of Hadoop add host and protocol information into paths - and ThriftFS chokes on this. So remove any of this guff (ThriftFS's server will have this information anyway)

File size: 19.3 KB
Line 
1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47 # We need the Perl version before continuing
48 if (!defined $ENV{'PERL_VERSION'})
49 {
50 $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`;
51 }
52 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
53 # Bit::Vector and Thrift modules
54 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
55 # ThriftFS Perl API
56 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl');
57}
58
59# Modules - Core
60use Devel::Peek;
61use MIME::Base64;
62use POSIX qw(floor);
63use Symbol;
64use Thrift::Socket;
65use Thrift::BufferedTransport;
66use Thrift::BinaryProtocol;
67# Modules - Thrift
68use HadoopFS::FileSystem;
69# Modules - Greenstone
70use FileUtils::HDThriftFS::ThriftFH;
71use MIME::Base91;
72
73# Configuration
74my $host = "localhost";
75my $port = 58660;
76my $debug = 0;
77my $debug_encoding = 0;
78
79# Globals
80my $transport;
81my $thrift_client;
82
83
84## @function END()
85#
86# Ensure the transport layer, if open, is properly closed
87#
88END
89{
90 if (defined $transport)
91 {
92 $transport->close();
93 }
94}
95## END()
96
97
98## @function _establishClient()
99#
100sub _establishClient
101{
102 if (!defined $thrift_client)
103 {
104 ###rint "Create Thrift Client to $host:$port\n";
105 my $socket = Thrift::Socket->new($host, $port);
106 $socket->setSendTimeout(10000);
107 $socket->setRecvTimeout(20000);
108
109 $transport = Thrift::BufferedTransport->new($socket);
110 my $protocol = Thrift::BinaryProtocol->new($transport);
111 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
112
113 eval { $transport->open(); };
114 if ($@)
115 {
116 &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
117 }
118 }
119}
120## _establishClient()
121
122
123## @function _generateHDFSPath()
124#
125sub _generateHDFSPath
126{
127 my ($path) = @_;
128 if (ref($path) ne 'HadoopFS::Pathname')
129 {
130 if ($path !~ /HDThriftFS:\/\//)
131 {
132 &FileUtils::printError('Not a valid thrift URI: ' . $path);
133 }
134 else
135 {
136 # Remove protocol and any host and port information
137 $path =~ s/HDThriftFS:\/\/[^\/]*//;
138 $path = HadoopFS::Pathname->new( { pathname => $path } );
139 }
140 }
141 return $path;
142}
143## _generateHDFSPath()
144
145
146## @function _printDebug()
147#
148sub _printDebug
149{
150 my ($msg) = @_;
151 if ($debug)
152 {
153 my ($package, $filename, $line, $function) = caller(1);
154 print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
155 }
156}
157## _printDebug()
158
159################################################################################
160################################################################################
161################################################################################
162
163
164## @function canRead()
165#
166sub canRead
167{
168 my $path = shift(@_);
169 return &checkPermission($path, 'r', @_);
170}
171## canRead()
172
173
174## @function checkPermission()
175#
176sub checkPermission
177{
178 my ($path, $mode, $username, $usergroup) = @_;
179 my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
180 # - ensure we have a connection to the thrift server
181 &_establishClient();
182 # - convert the path into a proper thrift path object
183 $path = &_generateHDFSPath($path);
184 # - determine the user (defaults to current user)
185 if (!defined $username)
186 {
187 if ($ENV{'GSDLOS'} =~ /^windows$/i)
188 {
189 require Win32;
190 $username = Win32::LoginName();
191 }
192 else
193 {
194 $username = getlogin || getpwuid($<);
195 }
196 }
197 # - determine the group
198 my $usergroups = {};
199 if (defined $usergroup)
200 {
201 $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
202 }
203 else
204 {
205 if ($ENV{'GSDLOS'} =~ /^windows$/i)
206 {
207 # dunno
208 }
209 else
210 {
211 my $raw_groups = `groups`;
212 foreach my $group ( split(/\s/, $raw_groups))
213 {
214 $usergroups->{$group} = 1;
215 }
216 }
217 }
218 # Retrieve details from the file
219 my $file_stat = $thrift_client->stat($path);
220 my $owner = $file_stat->{'owner'};
221 my $group = $file_stat->{'group'};
222 my $permissions = $file_stat->{'permission'};
223 # Begin the cascade of tests to determine if the identified user belonging to
224 # the identified group can perform 'mode' access to the file.
225 my $has_permission = 0;
226 # - start with [u]ser permission
227 if (defined $owner && $username eq $owner)
228 {
229 my $target_char = substr($permissions, $offsets->{$mode}, 1);
230 if ($mode eq $target_char)
231 {
232 $has_permission = 1;
233 }
234 }
235 # - failing that, try [g]roup level permissions
236 if (!$has_permission && defined $group && defined $usergroups->{$group})
237 {
238 my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
239 if ($mode eq $target_char)
240 {
241 $has_permission = 1;
242 }
243 }
244 # - and finally try [o]ther level permission
245 if (!$has_permission)
246 {
247 my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
248 if ($mode eq $target_char)
249 {
250 $has_permission = 1;
251 }
252 }
253 return $has_permission;
254}
255## checkPermission
256
257
258## @function closeFileHandle()
259#
260sub closeFileHandle
261{
262 my $fh_ref = shift(@_);
263 &_printDebug('');
264 close($$fh_ref);
265 untie($$fh_ref);
266 return 1;
267}
268## closeFileHandle()
269
270
271## @function fileSize()
272#
273sub fileSize
274{
275 my ($path, $test_op) = @_;
276 # ensure we have a connection to the thrift server
277 &_establishClient();
278 # - convert the path into a proper thrift path object
279 $path = &_generateHDFSPath($path);
280 my $file_stat = $thrift_client->stat($path);
281 return $file_stat->{length};
282}
283## fileSize()
284
285
286## @function fileTest()
287#
288sub fileTest
289{
290 my ($raw_path, $test_op) = @_;
291 my $result = 0;
292 # ensure we have a connection to the thrift server
293 &_establishClient();
294 # - convert the path into a proper thrift path object
295 my $path = &_generateHDFSPath($raw_path);
296 # note: symbolic linking not supported within HDFS
297 if (!defined $test_op || $test_op eq '-l')
298 {
299 $test_op = '-e';
300 }
301 if ($test_op eq '-d')
302 {
303 if ($thrift_client->exists($path))
304 {
305 my $file = $thrift_client->stat($path);
306 if ($file->{'isdir'})
307 {
308 $result = 1;
309 }
310 }
311 }
312 elsif ($test_op eq '-e')
313 {
314 if ($thrift_client->exists($path))
315 {
316 $result = 1;
317 }
318 }
319 elsif ($test_op eq '-f')
320 {
321 if ($thrift_client->exists($path))
322 {
323 my $file = $thrift_client->stat($path);
324 if (!$file->{'isdir'})
325 {
326 $result = 1;
327 }
328 }
329 }
330 else
331 {
332 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
333 }
334 return $result;
335}
336## fileTest()
337
338
339## @function filenameConcatenate()
340#
341sub filenameConcatenate
342{
343 my $protocol = shift(@_);
344 my $filename = join('/', @_);
345 # remove repeated slashes
346 $filename =~ s/[\/]+/\//g;
347 # append protocol (which may cause multiple slashes)
348 $filename = $protocol . '/' . $filename;
349 # strip any trailing slashes
350 $filename =~ s/[\\\/]$//;
351 return $filename;
352}
353## filenameConcatenate()
354
355
356## @function isFilenameAbsolute()
357#
358sub isFilenameAbsolute
359{
360 # File paths against HDFS must be.
361 return 1;
362}
363# isFilenameAbsolute()
364
365
366## @function makeDirectory()
367#
368sub makeDirectory
369{
370 my ($raw_path) = @_;
371 my $result = 0;
372 # ensure we have a connection to the thrift server
373 &_establishClient();
374 # - convert the path into a proper thrift path object
375 my $path = &_generateHDFSPath($raw_path);
376 if (!&fileTest($path, '-d'))
377 {
378 # - create the directory
379 $thrift_client->mkdirs($path);
380 }
381 # - check that it exists
382 return (&fileTest($path, '-d'));
383}
384## makeDirectory()
385
386
387## @function modificationTime()
388#
389sub modificationTime
390{
391 my ($path) = @_;
392 # ensure we have a connection to the thrift server
393 &_establishClient();
394 # - convert the path into a proper thrift path object
395 $path = &_generateHDFSPath($path);
396 my $file_stat = $thrift_client->stat($path);
397 return floor($file_stat->{modificationTime} / 1000);
398}
399## modificationTime()
400
401
402## @function openFileHandle()
403#
404sub openFileHandle
405{
406 my ($raw_path, $mode, $fh_ref) = @_;
407 &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
408 # ensure we have a connection to the thrift server
409 &_establishClient();
410 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
411 my $path = &_generateHDFSPath($raw_path);
412 my $fh = gensym();
413 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
414 open($fh, $path, $mode) or die("Failed to open thriftfs");
415 $$fh_ref = $fh;
416 return 1;
417}
418## openFileHandle()
419
420
421## @function readDirectory()
422#
423sub readDirectory
424{
425 my ($raw_path) = @_;
426 my @files;
427 # ensure we have a connection to the thrift server
428 &_establishClient();
429 my $path = &_generateHDFSPath($raw_path);
430 my $raw_files = $thrift_client->listStatus($path);
431 if ($raw_files && @{$raw_files} > 0)
432 {
433 foreach my $file_stat (@{$raw_files})
434 {
435 my $file_path = $file_stat->{'path'};
436 my ($filename) = $file_path =~ /([^\\\/]+)$/;
437 push(@files, $filename);
438 }
439 }
440 return \@files;
441}
442## readDirectory()
443
444
445## @function removeFiles()
446#
447sub removeFiles
448{
449 my ($path, $recursive) = @_;
450 my $result = 0;
451 if (!defined $recursive)
452 {
453 $recursive = 0;
454 }
455 # ensure we have a connection to the thrift server
456 &_establishClient();
457 # - convert the path into a proper thrift path object as necessary
458 $path = &_generateHDFSPath($path);
459 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
460 {
461 $thrift_client->rm($path, $recursive);
462 $result = !$thrift_client->exists($path);
463 }
464 return $result;
465}
466## removeFiles()
467
468
469## @function removeFilesFiltered()
470#
471sub removeFilesFiltered
472{
473 my ($paths, $accept_re, $reject_re) = @_;
474 # ensure we have a connection to the thrift server
475 &_establishClient();
476 # Perform a depth first, recursive, removal of files and directories that
477 # match the given accept and reject patterns
478 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
479 my $num_removed = 0;
480 foreach my $raw_path (@paths_array)
481 {
482 # remove trailing slashes
483 $raw_path =~ s/[\/\\]+$//;
484 my $path = &_generateHDFSPath($raw_path);
485 if (!$thrift_client->exists($path))
486 {
487 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
488 }
489 elsif (&fileTest($path, '-d'))
490 {
491 my @files = @{&readDirectory($path)};
492 foreach my $file (@files)
493 {
494 my $child_path = $raw_path . '/' . $file;
495 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
496 }
497 if (!defined $accept_re && !defined $reject_re)
498 {
499 # remove this directory - non-recursively so that the command fails
500 # if there are (somehow) still files contained within
501 $thrift_client->rm($path, 0);
502 if ($thrift_client->exists($path))
503 {
504 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
505 }
506 else
507 {
508 $num_removed++;
509 }
510 }
511 }
512 else
513 {
514 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
515 {
516 next;
517 }
518 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
519 {
520 # remove this file
521 $thrift_client->rm($path, 0);
522 if ($thrift_client->exists($path))
523 {
524 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
525 }
526 else
527 {
528 $num_removed++;
529 }
530 }
531 }
532 }
533 return $num_removed;
534}
535## removeFilesFiltered()
536
537
538## @function removeFilesRecursive()
539#
540sub removeFilesRecursive
541{
542 my ($path) = @_;
543 # use the more general removeFilesFiltered() function with no accept or reject
544 # expressions
545 return &removeFilesFiltered($path, undef, undef);
546}
547## removeFilesRecursive()
548
549
550## @function supportsSymbolicLink
551#
552sub supportsSymbolicLink
553{
554 return 0;
555}
556## supportsSymbolicLink()
557
558
559## @function transferFile()
560#
561sub transferFile
562{
563 my ($mode, $src, $dst) = @_;
564 # ensure we have a connection to the thrift server
565 &_establishClient();
566 #rint STDERR "transferFile($mode, $src, $dst)\n";
567 my $src_path = &_generateHDFSPath($src);
568 my $dst_path = &_generateHDFSPath($dst);
569 if (&fileTest($dst_path, '-d'))
570 {
571 my ($filename) = $src =~ /([^\\\/]+)$/;
572 $dst .= '/' . $filename;
573 $dst_path = &_generateHDFSPath($dst);
574 }
575 if (!$thrift_client->exists($src_path))
576 {
577 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
578 return 0;
579 }
580 if ($thrift_client->exists($dst_path))
581 {
582 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
583 return 0;
584 }
585 # what happens next depends on the mode, and is either very easy or really
586 # hard
587 if ($mode eq 'MOVE')
588 {
589 $thrift_client->rename($src_path, $dst_path);
590 }
591 elsif ($mode eq 'COPY')
592 {
593 # Open the src file for reading
594 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
595 my $fhin = $thrift_client->open($src_path);
596 # Create the dst file for writing
597 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
598 my $fhout = $thrift_client->create($dst_path);
599 # Read all of src file writing to dst file
600 # - this is where things have the potential to go wrong, as it doesn't seem
601 # thrift supports writing bytes
602 # - only strings. May need to see if I can make Perl behave using black
603 # magic flags (marking string as binary etc) It'll work fine for text
604 # files though
605 my $data = undef;
606 my $offset = 0;
607 my $length = 4096;
608 # Read 4K blocks at a time
609 while ($data = $thrift_client->read($fhin, $offset, $length))
610 {
611 $thrift_client->write($fhout, $data);
612 $offset += $length;
613 if (length ($data) < $length)
614 {
615 last;
616 }
617 }
618 # Close files
619 $thrift_client->close($fhout);
620 $thrift_client->close($fhin);
621 }
622 my $result = ($thrift_client->exists($dst_path));
623 #rint STDERR "transferFile() => $result\n";
624 return $result;
625}
626## transferFile()
627
628
629## @function transferFileFromLocal()
630#
631sub transferFileFromLocal
632{
633 my ($mode, $src, $dst) = @_;
634 # ensure we have a connection to the thrift server
635 &_establishClient();
636 # destination is remote
637 my $dst_path = &_generateHDFSPath($dst);
638 if (&fileTest($dst_path, '-d'))
639 {
640 my ($filename) = $src =~ /([^\\\/]+)$/;
641 $dst .= '/' . $filename;
642 $dst_path = &_generateHDFSPath($dst);
643 }
644 # can't replace - if the file already exists
645 if (&fileTest($dst_path, '-f'))
646 {
647 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
648 return 0;
649 }
650 # copy the file
651 my $fhin;
652 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
653 my $decoded = '';
654 my $fhout = $thrift_client->create($dst_path);
655 while (read($fhin, $decoded, 4096))
656 {
657 if ($debug_encoding)
658 {
659 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
660 }
661 # Base64 encode to protect binary
662 #my $encoded = encode_base64($decoded);
663 # Base91 encode to protect binary - we add a Byte Order Marker so the
664 # Thrift Server can detect the need to decode the string sent
665 my $encoded = MIME::Base91::encode($decoded);
666 if ($debug_encoding)
667 {
668 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
669 }
670 $thrift_client->write($fhout, $encoded);
671 }
672 close($fhin);
673 $thrift_client->close($fhout);
674 # in general, the transfer has worked if the destination file exists
675 my $result = $thrift_client->exists($dst_path);
676 # if moving, remove the source file from the local filesystem
677 if ($mode eq 'MOVE')
678 {
679 unlink($src);
680 # update result to reflect if we successfully removed the src file
681 $result = $result && (!-f $src);
682 }
683 return $result
684}
685## transferFileFromLocal()
686
687
688## @function transferFileToLocal()
689#
690sub transferFileToLocal
691{
692 my ($mode, $src, $dst) = @_;
693 # ensure we have a connection to the thrift server
694 &_establishClient();
695 # source is remote
696 my $src_path = &_generateHDFSPath($src);
697 if (!$thrift_client->exists($src_path))
698 {
699 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
700 return 0;
701 }
702 if (-d $dst)
703 {
704 my ($filename) = $src =~ /([^\\\/]+)$/;
705 $dst .= '/' . $filename;
706 }
707 if (-e $dst)
708 {
709 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
710 return 0;
711 }
712 # open local file
713 my $fhout;
714 my $encoded = undef;
715 my $offset = 0;
716 my $length = 4096; # Read 4K blocks at a time
717 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
718 my $fhin = $thrift_client->open($src_path);
719 while ($encoded = $thrift_client->read($fhin, $offset, $length))
720 {
721 if ($debug_encoding)
722 {
723 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
724 }
725 #my $decoded = decode_base64($encoded);
726 my $decoded = MIME::Base91::decode($encoded);
727 if ($debug_encoding)
728 {
729 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
730 }
731 print $fhout $decoded;
732 last if (length ($encoded) < $length);
733 $offset += $length;
734 }
735 close($fhout);
736 $thrift_client->close($fhin);
737 # in general, the transfer has worked if the destination file exists
738 my $result = (-f $dst);
739 # if moving, remove the source file from the HDFS filesystem
740 if ($mode eq 'MOVE')
741 {
742 $thrift_client->rm($src_path, 0);
743 # update result to reflect if we successfully removed the src file
744 $result = $result && !$thrift_client->exists($src_path);
745 }
746 return $result;
747}
748## transferFileToLocal()
749
750
7511;
Note: See TracBrowser for help on using the repository browser.