source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27525

Last change on this file since 27525 was 27525, checked in by jmt12, 11 years ago

Adding in a 'isHDFS()' function so that some plugins (SimpleVideoPlug) can know to move the files where other executables (HandbrakeCLI etc) can see them

File size: 19.9 KB
Line 
1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47 # We need the Perl version before continuing
48 if (!defined $ENV{'PERL_VERSION'})
49 {
50 $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`;
51 }
52 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
53 # Bit::Vector and Thrift modules
54 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
55 # ThriftFS Perl API
56 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl');
57}
58
59# Modules - Core
60use Devel::Peek;
61use MIME::Base64;
62use POSIX qw(floor);
63use Symbol;
64use Thrift::Socket;
65use Thrift::BufferedTransport;
66use Thrift::BinaryProtocol;
67# Modules - Thrift
68use HadoopFS::FileSystem;
69# Modules - Greenstone
70use FileUtils::HDThriftFS::ThriftFH;
71use MIME::Base91;
72
73# Configuration
74my $host = "medusa.local";
75my $port = 58660;
76my $debug = 0;
77my $debug_encoding = 0;
78#my $buffer_length = 4 * 1024; # 4k blocks
79#my $buffer_length = 8 * 1024; # 8k blocks
80#my $buffer_length = 16 * 1024; # 16K blocks
81#my $buffer_length = 32 * 1024; # 32k blocks
82#my $buffer_length = 64 * 1024; # 64k blocks
83#my $buffer_length = 128 * 1024; # 128k blocks
84#my $buffer_length = 256 * 1024; # 256k blocks
85my $buffer_length = 512 * 1024; # 512k blocks
86#my $buffer_length = 1024 * 1024; # 1M blocks
87#my $buffer_length = 2048 * 1024; # 2M blocks
88## These cause "OUT OF MEMORY" errors on Medusa
89#my $buffer_length = 4096 * 1024; # 4M blocks
90#my $buffer_length = 8192 * 1024; # 8M blocks
91
92# Globals
93my $transport;
94my $thrift_client;
95
96
97## @function END()
98#
99# Ensure the transport layer, if open, is properly closed
100#
101END
102{
103 if (defined $transport)
104 {
105 $transport->close();
106 }
107}
108## END()
109
110
111## @function _establishClient()
112#
113sub _establishClient
114{
115 if (!defined $thrift_client)
116 {
117 ###rint "Create Thrift Client to $host:$port\n";
118 my $socket = Thrift::Socket->new($host, $port);
119 $socket->setSendTimeout(10000);
120 $socket->setRecvTimeout(20000);
121
122 $transport = Thrift::BufferedTransport->new($socket);
123 my $protocol = Thrift::BinaryProtocol->new($transport);
124 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
125
126 eval { $transport->open(); };
127 if ($@)
128 {
129 &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
130 }
131 }
132}
133## _establishClient()
134
135
136## @function _generateHDFSPath()
137#
138sub _generateHDFSPath
139{
140 my ($path) = @_;
141 if (ref($path) ne 'HadoopFS::Pathname')
142 {
143 if ($path !~ /HDThriftFS:\/\//)
144 {
145 &FileUtils::printError('Not a valid thrift URI: ' . $path);
146 }
147 else
148 {
149 # Remove protocol and any host and port information
150 $path =~ s/HDThriftFS:\/\/[^\/]*//;
151 $path = HadoopFS::Pathname->new( { pathname => $path } );
152 }
153 }
154 return $path;
155}
156## _generateHDFSPath()
157
158
159## @function _printDebug()
160#
161sub _printDebug
162{
163 my ($msg) = @_;
164 if ($debug)
165 {
166 my ($package, $filename, $line, $function) = caller(1);
167 print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
168 }
169}
170## _printDebug()
171
172################################################################################
173################################################################################
174################################################################################
175
176
177## @function canRead()
178#
179sub canRead
180{
181 my $path = shift(@_);
182 return &checkPermission($path, 'r', @_);
183}
184## canRead()
185
186
187## @function checkPermission()
188#
189sub checkPermission
190{
191 my ($path, $mode, $username, $usergroup) = @_;
192 my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
193 # - ensure we have a connection to the thrift server
194 &_establishClient();
195 # - convert the path into a proper thrift path object
196 $path = &_generateHDFSPath($path);
197 # - determine the user (defaults to current user)
198 if (!defined $username)
199 {
200 if ($ENV{'GSDLOS'} =~ /^windows$/i)
201 {
202 require Win32;
203 $username = Win32::LoginName();
204 }
205 else
206 {
207 $username = getlogin || getpwuid($<);
208 }
209 }
210 # - determine the group
211 my $usergroups = {};
212 if (defined $usergroup)
213 {
214 $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
215 }
216 else
217 {
218 if ($ENV{'GSDLOS'} =~ /^windows$/i)
219 {
220 # dunno
221 }
222 else
223 {
224 my $raw_groups = `groups`;
225 foreach my $group ( split(/\s/, $raw_groups))
226 {
227 $usergroups->{$group} = 1;
228 }
229 }
230 }
231 # Retrieve details from the file
232 my $file_stat = $thrift_client->stat($path);
233 my $owner = $file_stat->{'owner'};
234 my $group = $file_stat->{'group'};
235 my $permissions = $file_stat->{'permission'};
236 # Begin the cascade of tests to determine if the identified user belonging to
237 # the identified group can perform 'mode' access to the file.
238 my $has_permission = 0;
239 # - start with [u]ser permission
240 if (defined $owner && $username eq $owner)
241 {
242 my $target_char = substr($permissions, $offsets->{$mode}, 1);
243 if ($mode eq $target_char)
244 {
245 $has_permission = 1;
246 }
247 }
248 # - failing that, try [g]roup level permissions
249 if (!$has_permission && defined $group && defined $usergroups->{$group})
250 {
251 my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
252 if ($mode eq $target_char)
253 {
254 $has_permission = 1;
255 }
256 }
257 # - and finally try [o]ther level permission
258 if (!$has_permission)
259 {
260 my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
261 if ($mode eq $target_char)
262 {
263 $has_permission = 1;
264 }
265 }
266 return $has_permission;
267}
268## checkPermission
269
270
271## @function closeFileHandle()
272#
273sub closeFileHandle
274{
275 my $fh_ref = shift(@_);
276 &_printDebug('');
277 close($$fh_ref);
278 untie($$fh_ref);
279 return 1;
280}
281## closeFileHandle()
282
283
284## @function fileSize()
285#
286sub fileSize
287{
288 my ($path, $test_op) = @_;
289 # ensure we have a connection to the thrift server
290 &_establishClient();
291 # - convert the path into a proper thrift path object
292 $path = &_generateHDFSPath($path);
293 my $file_stat = $thrift_client->stat($path);
294 return $file_stat->{length};
295}
296## fileSize()
297
298
299## @function fileTest()
300#
301sub fileTest
302{
303 my ($raw_path, $test_op) = @_;
304 my $result = 0;
305 # ensure we have a connection to the thrift server
306 &_establishClient();
307 # - convert the path into a proper thrift path object
308 my $path = &_generateHDFSPath($raw_path);
309 # note: symbolic linking not supported within HDFS
310 if (!defined $test_op || $test_op eq '-l')
311 {
312 $test_op = '-e';
313 }
314 if ($test_op eq '-d')
315 {
316 if ($thrift_client->exists($path))
317 {
318 my $file = $thrift_client->stat($path);
319 if ($file->{'isdir'})
320 {
321 $result = 1;
322 }
323 }
324 }
325 elsif ($test_op eq '-e')
326 {
327 if ($thrift_client->exists($path))
328 {
329 $result = 1;
330 }
331 }
332 elsif ($test_op eq '-f')
333 {
334 if ($thrift_client->exists($path))
335 {
336 my $file = $thrift_client->stat($path);
337 if (!$file->{'isdir'})
338 {
339 $result = 1;
340 }
341 }
342 }
343 else
344 {
345 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
346 }
347 return $result;
348}
349## fileTest()
350
351
352## @function filenameConcatenate()
353#
354sub filenameConcatenate
355{
356 my $protocol = shift(@_);
357 my $filename = join('/', @_);
358 # remove repeated slashes
359 $filename =~ s/[\/]+/\//g;
360 # append protocol (which may cause multiple slashes)
361 $filename = $protocol . '/' . $filename;
362 # strip any trailing slashes
363 $filename =~ s/[\\\/]$//;
364 return $filename;
365}
366## filenameConcatenate()
367
368
369## @function isFilenameAbsolute()
370#
371sub isFilenameAbsolute
372{
373 # File paths against HDFS must be.
374 return 1;
375}
376# isFilenameAbsolute()
377
378
379## @function isHDFS
380#
381sub isHDFS
382{
383 return 1;
384}
385## isHDFS()
386
387
388## @function makeDirectory()
389#
390sub makeDirectory
391{
392 my ($raw_path) = @_;
393 my $result = 0;
394 # ensure we have a connection to the thrift server
395 &_establishClient();
396 # - convert the path into a proper thrift path object
397 my $path = &_generateHDFSPath($raw_path);
398 if (!&fileTest($path, '-d'))
399 {
400 # - create the directory
401 $thrift_client->mkdirs($path);
402 }
403 # - check that it exists
404 return (&fileTest($path, '-d'));
405}
406## makeDirectory()
407
408
409## @function modificationTime()
410#
411sub modificationTime
412{
413 my ($path) = @_;
414 # ensure we have a connection to the thrift server
415 &_establishClient();
416 # - convert the path into a proper thrift path object
417 $path = &_generateHDFSPath($path);
418 my $file_stat = $thrift_client->stat($path);
419 return floor($file_stat->{modificationTime} / 1000);
420}
421## modificationTime()
422
423
424## @function openFileHandle()
425#
426sub openFileHandle
427{
428 my ($raw_path, $mode, $fh_ref) = @_;
429 &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
430 # ensure we have a connection to the thrift server
431 &_establishClient();
432 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
433 my $path = &_generateHDFSPath($raw_path);
434 my $fh = gensym();
435 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
436 open($fh, $path, $mode) or die("Failed to open thriftfs");
437 $$fh_ref = $fh;
438 return 1;
439}
440## openFileHandle()
441
442
443## @function readDirectory()
444#
445sub readDirectory
446{
447 my ($raw_path) = @_;
448 my @files;
449 # ensure we have a connection to the thrift server
450 &_establishClient();
451 my $path = &_generateHDFSPath($raw_path);
452 my $raw_files = $thrift_client->listStatus($path);
453 if ($raw_files && @{$raw_files} > 0)
454 {
455 foreach my $file_stat (@{$raw_files})
456 {
457 my $file_path = $file_stat->{'path'};
458 my ($filename) = $file_path =~ /([^\\\/]+)$/;
459 push(@files, $filename);
460 }
461 }
462 return \@files;
463}
464## readDirectory()
465
466
467## @function removeFiles()
468#
469sub removeFiles
470{
471 my ($path, $recursive) = @_;
472 my $result = 0;
473 if (!defined $recursive)
474 {
475 $recursive = 0;
476 }
477 # ensure we have a connection to the thrift server
478 &_establishClient();
479 # - convert the path into a proper thrift path object as necessary
480 $path = &_generateHDFSPath($path);
481 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
482 {
483 $thrift_client->rm($path, $recursive);
484 $result = !$thrift_client->exists($path);
485 }
486 return $result;
487}
488## removeFiles()
489
490
491## @function removeFilesFiltered()
492#
493sub removeFilesFiltered
494{
495 my ($paths, $accept_re, $reject_re) = @_;
496 # ensure we have a connection to the thrift server
497 &_establishClient();
498 # Perform a depth first, recursive, removal of files and directories that
499 # match the given accept and reject patterns
500 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
501 my $num_removed = 0;
502 foreach my $raw_path (@paths_array)
503 {
504 # remove trailing slashes
505 $raw_path =~ s/[\/\\]+$//;
506 my $path = &_generateHDFSPath($raw_path);
507 if (!$thrift_client->exists($path))
508 {
509 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
510 }
511 elsif (&fileTest($path, '-d'))
512 {
513 my @files = @{&readDirectory($path)};
514 foreach my $file (@files)
515 {
516 my $child_path = $raw_path . '/' . $file;
517 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
518 }
519 if (!defined $accept_re && !defined $reject_re)
520 {
521 # remove this directory - non-recursively so that the command fails
522 # if there are (somehow) still files contained within
523 $thrift_client->rm($path, 0);
524 if ($thrift_client->exists($path))
525 {
526 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
527 }
528 else
529 {
530 $num_removed++;
531 }
532 }
533 }
534 else
535 {
536 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
537 {
538 next;
539 }
540 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
541 {
542 # remove this file
543 $thrift_client->rm($path, 0);
544 if ($thrift_client->exists($path))
545 {
546 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
547 }
548 else
549 {
550 $num_removed++;
551 }
552 }
553 }
554 }
555 return $num_removed;
556}
557## removeFilesFiltered()
558
559
560## @function removeFilesRecursive()
561#
562sub removeFilesRecursive
563{
564 my ($path) = @_;
565 # use the more general removeFilesFiltered() function with no accept or reject
566 # expressions
567 return &removeFilesFiltered($path, undef, undef);
568}
569## removeFilesRecursive()
570
571
572## @function supportsSymbolicLink
573#
574sub supportsSymbolicLink
575{
576 return 0;
577}
578## supportsSymbolicLink()
579
580
581## @function transferFile()
582#
583sub transferFile
584{
585 my ($mode, $src, $dst) = @_;
586 # ensure we have a connection to the thrift server
587 &_establishClient();
588 #rint STDERR "transferFile($mode, $src, $dst)\n";
589 my $src_path = &_generateHDFSPath($src);
590 my $dst_path = &_generateHDFSPath($dst);
591 if (&fileTest($dst_path, '-d'))
592 {
593 my ($filename) = $src =~ /([^\\\/]+)$/;
594 $dst .= '/' . $filename;
595 $dst_path = &_generateHDFSPath($dst);
596 }
597 if (!$thrift_client->exists($src_path))
598 {
599 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
600 return 0;
601 }
602 if ($thrift_client->exists($dst_path))
603 {
604 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
605 return 0;
606 }
607 # what happens next depends on the mode, and is either very easy or really
608 # hard
609 if ($mode eq 'MOVE')
610 {
611 $thrift_client->rename($src_path, $dst_path);
612 }
613 elsif ($mode eq 'COPY')
614 {
615 # Open the src file for reading
616 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
617 my $fhin = $thrift_client->open($src_path);
618 # Create the dst file for writing
619 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
620 my $fhout = $thrift_client->create($dst_path);
621 # Read all of src file writing to dst file
622 # - this is where things have the potential to go wrong, as it doesn't seem
623 # thrift supports writing bytes
624 # - only strings. May need to see if I can make Perl behave using black
625 # magic flags (marking string as binary etc) It'll work fine for text
626 # files though
627 my $data = undef;
628 my $offset = 0;
629 # Read 4K blocks at a time
630 while ($data = $thrift_client->read($fhin, $offset, $buffer_length))
631 {
632 $thrift_client->write($fhout, $data);
633 $offset += $buffer_length;
634 if (length ($data) < $buffer_length)
635 {
636 last;
637 }
638 }
639 # Close files
640 $thrift_client->close($fhout);
641 $thrift_client->close($fhin);
642 }
643 my $result = ($thrift_client->exists($dst_path));
644 #rint STDERR "transferFile() => $result\n";
645 return $result;
646}
647## transferFile()
648
649
650## @function transferFileFromLocal()
651#
652sub transferFileFromLocal
653{
654 my ($mode, $src, $dst) = @_;
655 # ensure we have a connection to the thrift server
656 &_establishClient();
657 # destination is remote
658 my $dst_path = &_generateHDFSPath($dst);
659 if (&fileTest($dst_path, '-d'))
660 {
661 my ($filename) = $src =~ /([^\\\/]+)$/;
662 $dst .= '/' . $filename;
663 $dst_path = &_generateHDFSPath($dst);
664 }
665 # can't replace - if the file already exists
666 if (&fileTest($dst_path, '-f'))
667 {
668 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
669 return 0;
670 }
671 # copy the file
672 my $fhin;
673 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
674 my $decoded = '';
675 my $fhout = $thrift_client->create($dst_path);
676 while (read($fhin, $decoded, $buffer_length))
677 {
678 if ($debug_encoding)
679 {
680 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
681 }
682 # Base64 encode to protect binary
683 #my $encoded = encode_base64($decoded);
684 # Base91 encode to protect binary - we add a Byte Order Marker so the
685 # Thrift Server can detect the need to decode the string sent
686 my $encoded = MIME::Base91::encode($decoded);
687 if ($debug_encoding)
688 {
689 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
690 }
691 $thrift_client->write($fhout, $encoded);
692 }
693 close($fhin);
694 $thrift_client->close($fhout);
695 # in general, the transfer has worked if the destination file exists
696 my $result = $thrift_client->exists($dst_path);
697 # if moving, remove the source file from the local filesystem
698 if ($mode eq 'MOVE')
699 {
700 unlink($src);
701 # update result to reflect if we successfully removed the src file
702 $result = $result && (!-f $src);
703 }
704 return $result
705}
706## transferFileFromLocal()
707
708
709## @function transferFileToLocal()
710#
711sub transferFileToLocal
712{
713 my ($mode, $src, $dst) = @_;
714 # ensure we have a connection to the thrift server
715 &_establishClient();
716 # source is remote
717 my $src_path = &_generateHDFSPath($src);
718 if (!$thrift_client->exists($src_path))
719 {
720 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
721 return 0;
722 }
723 if (-d $dst)
724 {
725 my ($filename) = $src =~ /([^\\\/]+)$/;
726 $dst .= '/' . $filename;
727 }
728 if (-e $dst)
729 {
730 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
731 return 0;
732 }
733 # open local file
734 my $fhout;
735 my $encoded = undef;
736 my $offset = 0;
737 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
738 my $fhin = $thrift_client->open($src_path);
739 while ($encoded = $thrift_client->read($fhin, $offset, $buffer_length))
740 {
741 if ($debug_encoding)
742 {
743 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
744 }
745 my $decoded = MIME::Base91::decode($encoded);
746 if ($debug_encoding)
747 {
748 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
749 }
750 print $fhout $decoded;
751 if (length ($decoded) < $buffer_length)
752 {
753 last;
754 }
755 else
756 {
757 $offset += $buffer_length;
758 }
759 }
760 close($fhout);
761 $thrift_client->close($fhin);
762 # in general, the transfer has worked if the destination file exists
763 my $result = (-f $dst);
764 # if moving, remove the source file from the HDFS filesystem
765 if ($mode eq 'MOVE')
766 {
767 $thrift_client->rm($src_path, 0);
768 # update result to reflect if we successfully removed the src file
769 $result = $result && !$thrift_client->exists($src_path);
770 }
771 return $result;
772}
773## transferFileToLocal()
774
775
7761;
Note: See TracBrowser for help on using the repository browser.