source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27547

Last change on this file since 27547 was 27547, checked in by jmt12, 11 years ago

Rejigging some processing comments

File size: 20.6 KB
Line 
1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47 die "GSDLCOLLECTDIR not set\n" unless defined $ENV{'GSDLCOLLECTDIR'};
48 # We need the Perl version before continuing
49 if (!defined $ENV{'PERL_VERSION'})
50 {
51 $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`;
52 }
53 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
54 # Bit::Vector and Thrift modules
55 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
56 # ThriftFS Perl API
57 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl');
58}
59
60# Modules - Core
61use Devel::Peek;
62use MIME::Base64;
63use POSIX qw(floor);
64use Symbol;
65use Thrift::Socket;
66use Thrift::BufferedTransport;
67use Thrift::BinaryProtocol;
68# Modules - Thrift
69use HadoopFS::FileSystem;
70# Modules - Greenstone
71use FileUtils::HDThriftFS::ThriftFH;
72use MIME::Base91;
73
74# Configuration
75my $host = "localhost";
76my $port = 58660;
77my $debug = 0;
78my $debug_encoding = 0;
79# Testing shows 64k is pretty optimal
80#my $buffer_length = 4 * 1024; # 4k blocks
81#my $buffer_length = 8 * 1024; # 8k blocks
82#my $buffer_length = 16 * 1024; # 16K blocks
83#my $buffer_length = 32 * 1024; # 32k blocks
84#my $buffer_length = 64 * 1024; # 64k blocks
85#my $buffer_length = 128 * 1024; # 128k blocks
86#my $buffer_length = 256 * 1024; # 256k blocks
87my $buffer_length = 512 * 1024; # 512k blocks
88#my $buffer_length = 1024 * 1024; # 1M blocks
89#my $buffer_length = 2048 * 1024; # 2M blocks
90## These cause "OUT OF MEMORY" errors on Medusa
91#my $buffer_length = 4096 * 1024; # 4M blocks
92#my $buffer_length = 8192 * 1024; # 8M blocks
93
94# Globals
95my $transport;
96my $thrift_client;
97
98
99## @function END()
100#
101# Ensure the transport layer, if open, is properly closed
102#
103END
104{
105 if (defined $transport)
106 {
107 $transport->close();
108 }
109}
110## END()
111
112
113## @function _establishClient()
114#
115sub _establishClient
116{
117 if (!defined $thrift_client)
118 {
119 # Look for a configuration file to override the default localhost:58660
120 # settings
121 my $conf_file_path = &FileUtils::filenameConcatenate($ENV{'GSDLCOLLECTDIR'}, 'etc', 'thrift.conf');
122 if (&FileUtils::fileExists($conf_file_path))
123 {
124 print " * Found Thrift configuration file:\n";
125 my $conf_raw = &FileUtils::fileGetContents($conf_file_path);
126 if ($conf_raw =~ /^([^:]+):(\d+)/)
127 {
128 $host = $1;
129 print " - Host: " . $host . "\n";
130 $port = $2;
131 print " - Port: " . $port . "\n";
132 }
133 }
134
135 print " * Creating Thrift client connected to: $host:$port\n";
136 my $socket = Thrift::Socket->new($host, $port);
137 $socket->setSendTimeout(10000);
138 $socket->setRecvTimeout(20000);
139
140 $transport = Thrift::BufferedTransport->new($socket);
141 my $protocol = Thrift::BinaryProtocol->new($transport);
142 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
143
144 eval { $transport->open(); };
145 if ($@)
146 {
147 &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
148 }
149 }
150}
151## _establishClient()
152
153
154## @function _generateHDFSPath()
155#
156sub _generateHDFSPath
157{
158 my ($path) = @_;
159 if (ref($path) ne 'HadoopFS::Pathname')
160 {
161 if ($path !~ /HDThriftFS:\/\//)
162 {
163 &FileUtils::printError('Not a valid thrift URI: ' . $path);
164 }
165 else
166 {
167 # Remove protocol and any host and port information
168 $path =~ s/HDThriftFS:\/\/[^\/]*//;
169 $path = HadoopFS::Pathname->new( { pathname => $path } );
170 }
171 }
172 return $path;
173}
174## _generateHDFSPath()
175
176
177## @function _printDebug()
178#
179sub _printDebug
180{
181 my ($msg) = @_;
182 if ($debug)
183 {
184 my ($package, $filename, $line, $function) = caller(1);
185 print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
186 }
187}
188## _printDebug()
189
190################################################################################
191################################################################################
192################################################################################
193
194
195## @function canRead()
196#
197sub canRead
198{
199 my $path = shift(@_);
200 return &checkPermission($path, 'r', @_);
201}
202## canRead()
203
204
205## @function checkPermission()
206#
207sub checkPermission
208{
209 my ($path, $mode, $username, $usergroup) = @_;
210 my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
211 # - ensure we have a connection to the thrift server
212 &_establishClient();
213 # - convert the path into a proper thrift path object
214 $path = &_generateHDFSPath($path);
215 # - determine the user (defaults to current user)
216 if (!defined $username)
217 {
218 if ($ENV{'GSDLOS'} =~ /^windows$/i)
219 {
220 require Win32;
221 $username = Win32::LoginName();
222 }
223 else
224 {
225 $username = getlogin || getpwuid($<);
226 }
227 }
228 # - determine the group
229 my $usergroups = {};
230 if (defined $usergroup)
231 {
232 $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
233 }
234 else
235 {
236 if ($ENV{'GSDLOS'} =~ /^windows$/i)
237 {
238 # dunno
239 }
240 else
241 {
242 my $raw_groups = `groups`;
243 foreach my $group ( split(/\s/, $raw_groups))
244 {
245 $usergroups->{$group} = 1;
246 }
247 }
248 }
249 # Retrieve details from the file
250 my $file_stat = $thrift_client->stat($path);
251 my $owner = $file_stat->{'owner'};
252 my $group = $file_stat->{'group'};
253 my $permissions = $file_stat->{'permission'};
254 # Begin the cascade of tests to determine if the identified user belonging to
255 # the identified group can perform 'mode' access to the file.
256 my $has_permission = 0;
257 # - start with [u]ser permission
258 if (defined $owner && $username eq $owner)
259 {
260 my $target_char = substr($permissions, $offsets->{$mode}, 1);
261 if ($mode eq $target_char)
262 {
263 $has_permission = 1;
264 }
265 }
266 # - failing that, try [g]roup level permissions
267 if (!$has_permission && defined $group && defined $usergroups->{$group})
268 {
269 my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
270 if ($mode eq $target_char)
271 {
272 $has_permission = 1;
273 }
274 }
275 # - and finally try [o]ther level permission
276 if (!$has_permission)
277 {
278 my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
279 if ($mode eq $target_char)
280 {
281 $has_permission = 1;
282 }
283 }
284 return $has_permission;
285}
286## checkPermission
287
288
289## @function closeFileHandle()
290#
291sub closeFileHandle
292{
293 my $fh_ref = shift(@_);
294 &_printDebug('');
295 close($$fh_ref);
296 untie($$fh_ref);
297 return 1;
298}
299## closeFileHandle()
300
301
302## @function fileSize()
303#
304sub fileSize
305{
306 my ($path, $test_op) = @_;
307 # ensure we have a connection to the thrift server
308 &_establishClient();
309 # - convert the path into a proper thrift path object
310 $path = &_generateHDFSPath($path);
311 my $file_stat = $thrift_client->stat($path);
312 return $file_stat->{length};
313}
314## fileSize()
315
316
317## @function fileTest()
318#
319sub fileTest
320{
321 my ($raw_path, $test_op) = @_;
322 my $result = 0;
323 # ensure we have a connection to the thrift server
324 &_establishClient();
325 # - convert the path into a proper thrift path object
326 my $path = &_generateHDFSPath($raw_path);
327 # note: symbolic linking not supported within HDFS
328 if (!defined $test_op || $test_op eq '-l')
329 {
330 $test_op = '-e';
331 }
332 if ($test_op eq '-d')
333 {
334 if ($thrift_client->exists($path))
335 {
336 my $file = $thrift_client->stat($path);
337 if ($file->{'isdir'})
338 {
339 $result = 1;
340 }
341 }
342 }
343 elsif ($test_op eq '-e')
344 {
345 if ($thrift_client->exists($path))
346 {
347 $result = 1;
348 }
349 }
350 elsif ($test_op eq '-f')
351 {
352 if ($thrift_client->exists($path))
353 {
354 my $file = $thrift_client->stat($path);
355 if (!$file->{'isdir'})
356 {
357 $result = 1;
358 }
359 }
360 }
361 else
362 {
363 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
364 }
365 return $result;
366}
367## fileTest()
368
369
370## @function filenameConcatenate()
371#
372sub filenameConcatenate
373{
374 my $protocol = shift(@_);
375 my $filename = join('/', @_);
376 # remove repeated slashes
377 $filename =~ s/[\/]+/\//g;
378 # append protocol (which may cause multiple slashes)
379 $filename = $protocol . '/' . $filename;
380 # strip any trailing slashes
381 $filename =~ s/[\\\/]$//;
382 return $filename;
383}
384## filenameConcatenate()
385
386
387## @function isFilenameAbsolute()
388#
389sub isFilenameAbsolute
390{
391 # File paths against HDFS must be.
392 return 1;
393}
394# isFilenameAbsolute()
395
396
397## @function isHDFS
398#
399sub isHDFS
400{
401 return 1;
402}
403## isHDFS()
404
405
406## @function makeDirectory()
407#
408sub makeDirectory
409{
410 my ($raw_path) = @_;
411 my $result = 0;
412 # ensure we have a connection to the thrift server
413 &_establishClient();
414 # - convert the path into a proper thrift path object
415 my $path = &_generateHDFSPath($raw_path);
416 if (!&fileTest($path, '-d'))
417 {
418 # - create the directory
419 $thrift_client->mkdirs($path);
420 }
421 # - check that it exists
422 return (&fileTest($path, '-d'));
423}
424## makeDirectory()
425
426
427## @function modificationTime()
428#
429sub modificationTime
430{
431 my ($path) = @_;
432 # ensure we have a connection to the thrift server
433 &_establishClient();
434 # - convert the path into a proper thrift path object
435 $path = &_generateHDFSPath($path);
436 my $file_stat = $thrift_client->stat($path);
437 return floor($file_stat->{modificationTime} / 1000);
438}
439## modificationTime()
440
441
442## @function openFileHandle()
443#
444sub openFileHandle
445{
446 my ($raw_path, $mode, $fh_ref) = @_;
447 &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
448 # ensure we have a connection to the thrift server
449 &_establishClient();
450 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
451 my $path = &_generateHDFSPath($raw_path);
452 my $fh = gensym();
453 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
454 open($fh, $path, $mode) or die("Failed to open thriftfs");
455 $$fh_ref = $fh;
456 return 1;
457}
458## openFileHandle()
459
460
461## @function readDirectory()
462#
463sub readDirectory
464{
465 my ($raw_path) = @_;
466 my @files;
467 # ensure we have a connection to the thrift server
468 &_establishClient();
469 my $path = &_generateHDFSPath($raw_path);
470 my $raw_files = $thrift_client->listStatus($path);
471 if ($raw_files && @{$raw_files} > 0)
472 {
473 foreach my $file_stat (@{$raw_files})
474 {
475 my $file_path = $file_stat->{'path'};
476 my ($filename) = $file_path =~ /([^\\\/]+)$/;
477 push(@files, $filename);
478 }
479 }
480 return \@files;
481}
482## readDirectory()
483
484
485## @function removeFiles()
486#
487sub removeFiles
488{
489 my ($path, $recursive) = @_;
490 my $result = 0;
491 if (!defined $recursive)
492 {
493 $recursive = 0;
494 }
495 # ensure we have a connection to the thrift server
496 &_establishClient();
497 # - convert the path into a proper thrift path object as necessary
498 $path = &_generateHDFSPath($path);
499 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
500 {
501 $thrift_client->rm($path, $recursive);
502 $result = !$thrift_client->exists($path);
503 }
504 return $result;
505}
506## removeFiles()
507
508
509## @function removeFilesFiltered()
510#
511sub removeFilesFiltered
512{
513 my ($paths, $accept_re, $reject_re) = @_;
514 # ensure we have a connection to the thrift server
515 &_establishClient();
516 # Perform a depth first, recursive, removal of files and directories that
517 # match the given accept and reject patterns
518 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
519 my $num_removed = 0;
520 foreach my $raw_path (@paths_array)
521 {
522 # remove trailing slashes
523 $raw_path =~ s/[\/\\]+$//;
524 my $path = &_generateHDFSPath($raw_path);
525 if (!$thrift_client->exists($path))
526 {
527 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
528 }
529 elsif (&fileTest($path, '-d'))
530 {
531 my @files = @{&readDirectory($path)};
532 foreach my $file (@files)
533 {
534 my $child_path = $raw_path . '/' . $file;
535 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
536 }
537 if (!defined $accept_re && !defined $reject_re)
538 {
539 # remove this directory - non-recursively so that the command fails
540 # if there are (somehow) still files contained within
541 $thrift_client->rm($path, 0);
542 if ($thrift_client->exists($path))
543 {
544 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
545 }
546 else
547 {
548 $num_removed++;
549 }
550 }
551 }
552 else
553 {
554 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
555 {
556 next;
557 }
558 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
559 {
560 # remove this file
561 $thrift_client->rm($path, 0);
562 if ($thrift_client->exists($path))
563 {
564 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
565 }
566 else
567 {
568 $num_removed++;
569 }
570 }
571 }
572 }
573 return $num_removed;
574}
575## removeFilesFiltered()
576
577
578## @function removeFilesRecursive()
579#
580sub removeFilesRecursive
581{
582 my ($path) = @_;
583 # use the more general removeFilesFiltered() function with no accept or reject
584 # expressions
585 return &removeFilesFiltered($path, undef, undef);
586}
587## removeFilesRecursive()
588
589
590## @function supportsSymbolicLink
591#
592sub supportsSymbolicLink
593{
594 return 0;
595}
596## supportsSymbolicLink()
597
598
599## @function transferFile()
600#
601sub transferFile
602{
603 my ($mode, $src, $dst) = @_;
604 # ensure we have a connection to the thrift server
605 &_establishClient();
606 #rint STDERR "transferFile($mode, $src, $dst)\n";
607 my $src_path = &_generateHDFSPath($src);
608 my $dst_path = &_generateHDFSPath($dst);
609 if (&fileTest($dst_path, '-d'))
610 {
611 my ($filename) = $src =~ /([^\\\/]+)$/;
612 $dst .= '/' . $filename;
613 $dst_path = &_generateHDFSPath($dst);
614 }
615 if (!$thrift_client->exists($src_path))
616 {
617 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
618 return 0;
619 }
620 if ($thrift_client->exists($dst_path))
621 {
622 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
623 return 0;
624 }
625 # what happens next depends on the mode, and is either very easy or really
626 # hard
627 if ($mode eq 'MOVE')
628 {
629 $thrift_client->rename($src_path, $dst_path);
630 }
631 elsif ($mode eq 'COPY')
632 {
633 # Open the src file for reading
634 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
635 my $fhin = $thrift_client->open($src_path);
636 # Create the dst file for writing
637 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
638 my $fhout = $thrift_client->create($dst_path);
639 # Read all of src file writing to dst file
640 # - this is where things have the potential to go wrong, as it doesn't seem
641 # thrift supports writing bytes
642 # - only strings. May need to see if I can make Perl behave using black
643 # magic flags (marking string as binary etc) It'll work fine for text
644 # files though
645 my $data = undef;
646 my $offset = 0;
647 # Read 4K blocks at a time
648 while ($data = $thrift_client->read($fhin, $offset, $buffer_length))
649 {
650 $thrift_client->write($fhout, $data);
651 $offset += $buffer_length;
652 if (length ($data) < $buffer_length)
653 {
654 last;
655 }
656 }
657 # Close files
658 $thrift_client->close($fhout);
659 $thrift_client->close($fhin);
660 }
661 my $result = ($thrift_client->exists($dst_path));
662 #rint STDERR "transferFile() => $result\n";
663 return $result;
664}
665## transferFile()
666
667
668## @function transferFileFromLocal()
669#
670sub transferFileFromLocal
671{
672 my ($mode, $src, $dst) = @_;
673 # ensure we have a connection to the thrift server
674 &_establishClient();
675 # destination is remote
676 my $dst_path = &_generateHDFSPath($dst);
677 if (&fileTest($dst_path, '-d'))
678 {
679 my ($filename) = $src =~ /([^\\\/]+)$/;
680 $dst .= '/' . $filename;
681 $dst_path = &_generateHDFSPath($dst);
682 }
683 # can't replace - if the file already exists
684 if (&fileTest($dst_path, '-f'))
685 {
686 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
687 return 0;
688 }
689 # copy the file
690 my $fhin;
691 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
692 my $decoded = '';
693 my $fhout = $thrift_client->create($dst_path);
694 while (read($fhin, $decoded, $buffer_length))
695 {
696 if ($debug_encoding)
697 {
698 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
699 }
700 # Base64 encode to protect binary
701 #my $encoded = encode_base64($decoded);
702 # Base91 encode to protect binary - we add a Byte Order Marker so the
703 # Thrift Server can detect the need to decode the string sent
704 my $encoded = MIME::Base91::encode($decoded);
705 if ($debug_encoding)
706 {
707 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
708 }
709 $thrift_client->write($fhout, $encoded);
710 }
711 close($fhin);
712 $thrift_client->close($fhout);
713 # in general, the transfer has worked if the destination file exists
714 my $result = $thrift_client->exists($dst_path);
715 # if moving, remove the source file from the local filesystem
716 if ($mode eq 'MOVE')
717 {
718 unlink($src);
719 # update result to reflect if we successfully removed the src file
720 $result = $result && (!-f $src);
721 }
722 return $result
723}
724## transferFileFromLocal()
725
726
727## @function transferFileToLocal()
728#
729sub transferFileToLocal
730{
731 my ($mode, $src, $dst) = @_;
732 # ensure we have a connection to the thrift server
733 &_establishClient();
734 # source is remote
735 my $src_path = &_generateHDFSPath($src);
736 if (!$thrift_client->exists($src_path))
737 {
738 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
739 return 0;
740 }
741 if (-d $dst)
742 {
743 my ($filename) = $src =~ /([^\\\/]+)$/;
744 $dst .= '/' . $filename;
745 }
746 if (-e $dst)
747 {
748 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
749 return 0;
750 }
751 # open local file
752 my $fhout;
753 my $encoded = undef;
754 my $offset = 0;
755 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
756 my $fhin = $thrift_client->open($src_path);
757 while ($encoded = $thrift_client->read($fhin, $offset, $buffer_length))
758 {
759 if ($debug_encoding)
760 {
761 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
762 }
763 my $decoded = MIME::Base91::decode($encoded);
764 if ($debug_encoding)
765 {
766 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
767 }
768 print $fhout $decoded;
769 if (length ($decoded) < $buffer_length)
770 {
771 last;
772 }
773 else
774 {
775 $offset += $buffer_length;
776 }
777 }
778 close($fhout);
779 $thrift_client->close($fhin);
780 # in general, the transfer has worked if the destination file exists
781 my $result = (-f $dst);
782 # if moving, remove the source file from the HDFS filesystem
783 if ($mode eq 'MOVE')
784 {
785 $thrift_client->rm($src_path, 0);
786 # update result to reflect if we successfully removed the src file
787 $result = $result && !$thrift_client->exists($src_path);
788 }
789 return $result;
790}
791## transferFileToLocal()
792
793
7941;
Note: See TracBrowser for help on using the repository browser.