source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27514

Last change on this file since 27514 was 27514, checked in by jmt12, 11 years ago

Altering code to allow configurable length of read/write buffer when moving from local to remote and vice versa

File size: 19.8 KB
Line 
1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47 # We need the Perl version before continuing
48 if (!defined $ENV{'PERL_VERSION'})
49 {
50 $ENV{'PERL_VERSION'} = `perl -S $ENV{'GEXTPARALLELBUILDING'}/bin/script/perl-version.pl`;
51 }
52 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
53 # Bit::Vector and Thrift modules
54 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
55 # ThriftFS Perl API
56 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/gen-perl');
57}
58
59# Modules - Core
60use Devel::Peek;
61use MIME::Base64;
62use POSIX qw(floor);
63use Symbol;
64use Thrift::Socket;
65use Thrift::BufferedTransport;
66use Thrift::BinaryProtocol;
67# Modules - Thrift
68use HadoopFS::FileSystem;
69# Modules - Greenstone
70use FileUtils::HDThriftFS::ThriftFH;
71use MIME::Base91;
72
73# Configuration
74my $host = "localhost";
75my $port = 58660;
76my $debug = 0;
77my $debug_encoding = 0;
78#my $buffer_length = 4 * 1024; # 4k blocks
79#my $buffer_length = 8 * 1024; # 8k blocks
80#my $buffer_length = 16 * 1024; # 16K blocks
81#my $buffer_length = 32 * 1024; # 32k blocks
82my $buffer_length = 64 * 1024; # 64k blocks
83#my $buffer_length = 128 * 1024; # 128k blocks
84#my $buffer_length = 256 * 1024; # 256k blocks
85#my $buffer_length = 512 * 1024; # 512k blocks
86#my $buffer_length = 1024 * 1024; # 1M blocks
87#my $buffer_length = 2048 * 1024; # 2M blocks
88#my $buffer_length = 4096 * 1024; # 4M blocks
89#my $buffer_length = 8192 * 1024; # 8M blocks
90
91# Globals
92my $transport;
93my $thrift_client;
94
95
96## @function END()
97#
98# Ensure the transport layer, if open, is properly closed
99#
100END
101{
102 if (defined $transport)
103 {
104 $transport->close();
105 }
106}
107## END()
108
109
110## @function _establishClient()
111#
112sub _establishClient
113{
114 if (!defined $thrift_client)
115 {
116 ###rint "Create Thrift Client to $host:$port\n";
117 my $socket = Thrift::Socket->new($host, $port);
118 $socket->setSendTimeout(10000);
119 $socket->setRecvTimeout(20000);
120
121 $transport = Thrift::BufferedTransport->new($socket);
122 my $protocol = Thrift::BinaryProtocol->new($transport);
123 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
124
125 eval { $transport->open(); };
126 if ($@)
127 {
128 &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
129 }
130 }
131}
132## _establishClient()
133
134
135## @function _generateHDFSPath()
136#
137sub _generateHDFSPath
138{
139 my ($path) = @_;
140 if (ref($path) ne 'HadoopFS::Pathname')
141 {
142 if ($path !~ /HDThriftFS:\/\//)
143 {
144 &FileUtils::printError('Not a valid thrift URI: ' . $path);
145 }
146 else
147 {
148 # Remove protocol and any host and port information
149 $path =~ s/HDThriftFS:\/\/[^\/]*//;
150 $path = HadoopFS::Pathname->new( { pathname => $path } );
151 }
152 }
153 return $path;
154}
155## _generateHDFSPath()
156
157
158## @function _printDebug()
159#
160sub _printDebug
161{
162 my ($msg) = @_;
163 if ($debug)
164 {
165 my ($package, $filename, $line, $function) = caller(1);
166 print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
167 }
168}
169## _printDebug()
170
171################################################################################
172################################################################################
173################################################################################
174
175
176## @function canRead()
177#
178sub canRead
179{
180 my $path = shift(@_);
181 return &checkPermission($path, 'r', @_);
182}
183## canRead()
184
185
186## @function checkPermission()
187#
188sub checkPermission
189{
190 my ($path, $mode, $username, $usergroup) = @_;
191 my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
192 # - ensure we have a connection to the thrift server
193 &_establishClient();
194 # - convert the path into a proper thrift path object
195 $path = &_generateHDFSPath($path);
196 # - determine the user (defaults to current user)
197 if (!defined $username)
198 {
199 if ($ENV{'GSDLOS'} =~ /^windows$/i)
200 {
201 require Win32;
202 $username = Win32::LoginName();
203 }
204 else
205 {
206 $username = getlogin || getpwuid($<);
207 }
208 }
209 # - determine the group
210 my $usergroups = {};
211 if (defined $usergroup)
212 {
213 $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
214 }
215 else
216 {
217 if ($ENV{'GSDLOS'} =~ /^windows$/i)
218 {
219 # dunno
220 }
221 else
222 {
223 my $raw_groups = `groups`;
224 foreach my $group ( split(/\s/, $raw_groups))
225 {
226 $usergroups->{$group} = 1;
227 }
228 }
229 }
230 # Retrieve details from the file
231 my $file_stat = $thrift_client->stat($path);
232 my $owner = $file_stat->{'owner'};
233 my $group = $file_stat->{'group'};
234 my $permissions = $file_stat->{'permission'};
235 # Begin the cascade of tests to determine if the identified user belonging to
236 # the identified group can perform 'mode' access to the file.
237 my $has_permission = 0;
238 # - start with [u]ser permission
239 if (defined $owner && $username eq $owner)
240 {
241 my $target_char = substr($permissions, $offsets->{$mode}, 1);
242 if ($mode eq $target_char)
243 {
244 $has_permission = 1;
245 }
246 }
247 # - failing that, try [g]roup level permissions
248 if (!$has_permission && defined $group && defined $usergroups->{$group})
249 {
250 my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
251 if ($mode eq $target_char)
252 {
253 $has_permission = 1;
254 }
255 }
256 # - and finally try [o]ther level permission
257 if (!$has_permission)
258 {
259 my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
260 if ($mode eq $target_char)
261 {
262 $has_permission = 1;
263 }
264 }
265 return $has_permission;
266}
267## checkPermission
268
269
270## @function closeFileHandle()
271#
272sub closeFileHandle
273{
274 my $fh_ref = shift(@_);
275 &_printDebug('');
276 close($$fh_ref);
277 untie($$fh_ref);
278 return 1;
279}
280## closeFileHandle()
281
282
283## @function fileSize()
284#
285sub fileSize
286{
287 my ($path, $test_op) = @_;
288 # ensure we have a connection to the thrift server
289 &_establishClient();
290 # - convert the path into a proper thrift path object
291 $path = &_generateHDFSPath($path);
292 my $file_stat = $thrift_client->stat($path);
293 return $file_stat->{length};
294}
295## fileSize()
296
297
298## @function fileTest()
299#
300sub fileTest
301{
302 my ($raw_path, $test_op) = @_;
303 my $result = 0;
304 # ensure we have a connection to the thrift server
305 &_establishClient();
306 # - convert the path into a proper thrift path object
307 my $path = &_generateHDFSPath($raw_path);
308 # note: symbolic linking not supported within HDFS
309 if (!defined $test_op || $test_op eq '-l')
310 {
311 $test_op = '-e';
312 }
313 if ($test_op eq '-d')
314 {
315 if ($thrift_client->exists($path))
316 {
317 my $file = $thrift_client->stat($path);
318 if ($file->{'isdir'})
319 {
320 $result = 1;
321 }
322 }
323 }
324 elsif ($test_op eq '-e')
325 {
326 if ($thrift_client->exists($path))
327 {
328 $result = 1;
329 }
330 }
331 elsif ($test_op eq '-f')
332 {
333 if ($thrift_client->exists($path))
334 {
335 my $file = $thrift_client->stat($path);
336 if (!$file->{'isdir'})
337 {
338 $result = 1;
339 }
340 }
341 }
342 else
343 {
344 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
345 }
346 return $result;
347}
348## fileTest()
349
350
351## @function filenameConcatenate()
352#
353sub filenameConcatenate
354{
355 my $protocol = shift(@_);
356 my $filename = join('/', @_);
357 # remove repeated slashes
358 $filename =~ s/[\/]+/\//g;
359 # append protocol (which may cause multiple slashes)
360 $filename = $protocol . '/' . $filename;
361 # strip any trailing slashes
362 $filename =~ s/[\\\/]$//;
363 return $filename;
364}
365## filenameConcatenate()
366
367
368## @function isFilenameAbsolute()
369#
370sub isFilenameAbsolute
371{
372 # File paths against HDFS must be.
373 return 1;
374}
375# isFilenameAbsolute()
376
377
378## @function makeDirectory()
379#
380sub makeDirectory
381{
382 my ($raw_path) = @_;
383 my $result = 0;
384 # ensure we have a connection to the thrift server
385 &_establishClient();
386 # - convert the path into a proper thrift path object
387 my $path = &_generateHDFSPath($raw_path);
388 if (!&fileTest($path, '-d'))
389 {
390 # - create the directory
391 $thrift_client->mkdirs($path);
392 }
393 # - check that it exists
394 return (&fileTest($path, '-d'));
395}
396## makeDirectory()
397
398
399## @function modificationTime()
400#
401sub modificationTime
402{
403 my ($path) = @_;
404 # ensure we have a connection to the thrift server
405 &_establishClient();
406 # - convert the path into a proper thrift path object
407 $path = &_generateHDFSPath($path);
408 my $file_stat = $thrift_client->stat($path);
409 return floor($file_stat->{modificationTime} / 1000);
410}
411## modificationTime()
412
413
414## @function openFileHandle()
415#
416sub openFileHandle
417{
418 my ($raw_path, $mode, $fh_ref) = @_;
419 &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
420 # ensure we have a connection to the thrift server
421 &_establishClient();
422 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
423 my $path = &_generateHDFSPath($raw_path);
424 my $fh = gensym();
425 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
426 open($fh, $path, $mode) or die("Failed to open thriftfs");
427 $$fh_ref = $fh;
428 return 1;
429}
430## openFileHandle()
431
432
433## @function readDirectory()
434#
435sub readDirectory
436{
437 my ($raw_path) = @_;
438 my @files;
439 # ensure we have a connection to the thrift server
440 &_establishClient();
441 my $path = &_generateHDFSPath($raw_path);
442 my $raw_files = $thrift_client->listStatus($path);
443 if ($raw_files && @{$raw_files} > 0)
444 {
445 foreach my $file_stat (@{$raw_files})
446 {
447 my $file_path = $file_stat->{'path'};
448 my ($filename) = $file_path =~ /([^\\\/]+)$/;
449 push(@files, $filename);
450 }
451 }
452 return \@files;
453}
454## readDirectory()
455
456
457## @function removeFiles()
458#
459sub removeFiles
460{
461 my ($path, $recursive) = @_;
462 my $result = 0;
463 if (!defined $recursive)
464 {
465 $recursive = 0;
466 }
467 # ensure we have a connection to the thrift server
468 &_establishClient();
469 # - convert the path into a proper thrift path object as necessary
470 $path = &_generateHDFSPath($path);
471 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
472 {
473 $thrift_client->rm($path, $recursive);
474 $result = !$thrift_client->exists($path);
475 }
476 return $result;
477}
478## removeFiles()
479
480
481## @function removeFilesFiltered()
482#
483sub removeFilesFiltered
484{
485 my ($paths, $accept_re, $reject_re) = @_;
486 # ensure we have a connection to the thrift server
487 &_establishClient();
488 # Perform a depth first, recursive, removal of files and directories that
489 # match the given accept and reject patterns
490 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
491 my $num_removed = 0;
492 foreach my $raw_path (@paths_array)
493 {
494 # remove trailing slashes
495 $raw_path =~ s/[\/\\]+$//;
496 my $path = &_generateHDFSPath($raw_path);
497 if (!$thrift_client->exists($path))
498 {
499 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
500 }
501 elsif (&fileTest($path, '-d'))
502 {
503 my @files = @{&readDirectory($path)};
504 foreach my $file (@files)
505 {
506 my $child_path = $raw_path . '/' . $file;
507 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
508 }
509 if (!defined $accept_re && !defined $reject_re)
510 {
511 # remove this directory - non-recursively so that the command fails
512 # if there are (somehow) still files contained within
513 $thrift_client->rm($path, 0);
514 if ($thrift_client->exists($path))
515 {
516 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
517 }
518 else
519 {
520 $num_removed++;
521 }
522 }
523 }
524 else
525 {
526 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
527 {
528 next;
529 }
530 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
531 {
532 # remove this file
533 $thrift_client->rm($path, 0);
534 if ($thrift_client->exists($path))
535 {
536 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
537 }
538 else
539 {
540 $num_removed++;
541 }
542 }
543 }
544 }
545 return $num_removed;
546}
547## removeFilesFiltered()
548
549
550## @function removeFilesRecursive()
551#
552sub removeFilesRecursive
553{
554 my ($path) = @_;
555 # use the more general removeFilesFiltered() function with no accept or reject
556 # expressions
557 return &removeFilesFiltered($path, undef, undef);
558}
559## removeFilesRecursive()
560
561
562## @function supportsSymbolicLink
563#
564sub supportsSymbolicLink
565{
566 return 0;
567}
568## supportsSymbolicLink()
569
570
571## @function transferFile()
572#
573sub transferFile
574{
575 my ($mode, $src, $dst) = @_;
576 # ensure we have a connection to the thrift server
577 &_establishClient();
578 #rint STDERR "transferFile($mode, $src, $dst)\n";
579 my $src_path = &_generateHDFSPath($src);
580 my $dst_path = &_generateHDFSPath($dst);
581 if (&fileTest($dst_path, '-d'))
582 {
583 my ($filename) = $src =~ /([^\\\/]+)$/;
584 $dst .= '/' . $filename;
585 $dst_path = &_generateHDFSPath($dst);
586 }
587 if (!$thrift_client->exists($src_path))
588 {
589 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
590 return 0;
591 }
592 if ($thrift_client->exists($dst_path))
593 {
594 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
595 return 0;
596 }
597 # what happens next depends on the mode, and is either very easy or really
598 # hard
599 if ($mode eq 'MOVE')
600 {
601 $thrift_client->rename($src_path, $dst_path);
602 }
603 elsif ($mode eq 'COPY')
604 {
605 # Open the src file for reading
606 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
607 my $fhin = $thrift_client->open($src_path);
608 # Create the dst file for writing
609 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
610 my $fhout = $thrift_client->create($dst_path);
611 # Read all of src file writing to dst file
612 # - this is where things have the potential to go wrong, as it doesn't seem
613 # thrift supports writing bytes
614 # - only strings. May need to see if I can make Perl behave using black
615 # magic flags (marking string as binary etc) It'll work fine for text
616 # files though
617 my $data = undef;
618 my $offset = 0;
619 # Read 4K blocks at a time
620 while ($data = $thrift_client->read($fhin, $offset, $buffer_length))
621 {
622 $thrift_client->write($fhout, $data);
623 $offset += $buffer_length;
624 if (length ($data) < $buffer_length)
625 {
626 last;
627 }
628 }
629 # Close files
630 $thrift_client->close($fhout);
631 $thrift_client->close($fhin);
632 }
633 my $result = ($thrift_client->exists($dst_path));
634 #rint STDERR "transferFile() => $result\n";
635 return $result;
636}
637## transferFile()
638
639
640## @function transferFileFromLocal()
641#
642sub transferFileFromLocal
643{
644 my ($mode, $src, $dst) = @_;
645 # ensure we have a connection to the thrift server
646 &_establishClient();
647 # destination is remote
648 my $dst_path = &_generateHDFSPath($dst);
649 if (&fileTest($dst_path, '-d'))
650 {
651 my ($filename) = $src =~ /([^\\\/]+)$/;
652 $dst .= '/' . $filename;
653 $dst_path = &_generateHDFSPath($dst);
654 }
655 # can't replace - if the file already exists
656 if (&fileTest($dst_path, '-f'))
657 {
658 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
659 return 0;
660 }
661 # copy the file
662 my $fhin;
663 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
664 my $decoded = '';
665 my $fhout = $thrift_client->create($dst_path);
666 while (read($fhin, $decoded, $buffer_length))
667 {
668 if ($debug_encoding)
669 {
670 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
671 }
672 # Base64 encode to protect binary
673 #my $encoded = encode_base64($decoded);
674 # Base91 encode to protect binary - we add a Byte Order Marker so the
675 # Thrift Server can detect the need to decode the string sent
676 my $encoded = MIME::Base91::encode($decoded);
677 if ($debug_encoding)
678 {
679 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
680 }
681 $thrift_client->write($fhout, $encoded);
682 }
683 close($fhin);
684 $thrift_client->close($fhout);
685 # in general, the transfer has worked if the destination file exists
686 my $result = $thrift_client->exists($dst_path);
687 # if moving, remove the source file from the local filesystem
688 if ($mode eq 'MOVE')
689 {
690 unlink($src);
691 # update result to reflect if we successfully removed the src file
692 $result = $result && (!-f $src);
693 }
694 return $result
695}
696## transferFileFromLocal()
697
698
699## @function transferFileToLocal()
700#
701sub transferFileToLocal
702{
703 my ($mode, $src, $dst) = @_;
704 # ensure we have a connection to the thrift server
705 &_establishClient();
706 # source is remote
707 my $src_path = &_generateHDFSPath($src);
708 if (!$thrift_client->exists($src_path))
709 {
710 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
711 return 0;
712 }
713 if (-d $dst)
714 {
715 my ($filename) = $src =~ /([^\\\/]+)$/;
716 $dst .= '/' . $filename;
717 }
718 if (-e $dst)
719 {
720 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
721 return 0;
722 }
723 # open local file
724 my $fhout;
725 my $encoded = undef;
726 my $offset = 0;
727 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
728 my $fhin = $thrift_client->open($src_path);
729 while ($encoded = $thrift_client->read($fhin, $offset, $buffer_length))
730 {
731 if ($debug_encoding)
732 {
733 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
734 }
735 my $decoded = MIME::Base91::decode($encoded);
736 if ($debug_encoding)
737 {
738 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
739 }
740 print $fhout $decoded;
741 if (length ($decoded) < $buffer_length)
742 {
743 last;
744 }
745 else
746 {
747 $offset += $buffer_length;
748 }
749 }
750 close($fhout);
751 $thrift_client->close($fhin);
752 # in general, the transfer has worked if the destination file exists
753 my $result = (-f $dst);
754 # if moving, remove the source file from the HDFS filesystem
755 if ($mode eq 'MOVE')
756 {
757 $thrift_client->rm($src_path, 0);
758 # update result to reflect if we successfully removed the src file
759 $result = $result && !$thrift_client->exists($src_path);
760 }
761 return $result;
762}
763## transferFileToLocal()
764
765
7661;
Note: See TracBrowser for help on using the repository browser.