source: gs2-extensions/parallel-building/trunk/src/perllib/FileUtils/HDThriftFS.pm@ 27424

Last change on this file since 27424 was 27424, checked in by jmt12, 11 years ago

Adding canRead() and isAbsolute() functions, and some more debugging while tracking down issue with files of 4000-8000 bytes failing

File size: 19.0 KB
RevLine 
[27386]1################################################################################
2#
3# HDThriftFS.pm -- file functions acting upon a HDFS via thrift
4#
5# A component of the Greenstone digital library software from the New Zealand
6# Digital Library Project at the University of Waikato, New Zealand.
7#
8# Copyright (C) 2013 New Zealand Digital Library Project
9#
10# This program is free software; you can redistribute it and/or modify it under
11# the terms of the GNU General Public License as published by the Free Software
12# Foundation; either version 2 of the License, or (at your option) any later
13# version.
14#
15# This program is distributed in the hope that it will be useful, but WITHOUT
16# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
18# details.
19#
20# You should have received a copy of the GNU General Public License along with
21# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
22# Ave, Cambridge, MA 02139, USA.
23#
24###############################################################################
25
26# Thrift acts as client-server 'relay' between the Perl code and the HDFS. It
27# allows for persistant connections and so is significantly faster than
28# repeatedly starting Hadoop's Java application over and over. In order to
29# connect to the Thrift server this code needs to know the host and port the
30# server may be found on - information currently hard-coded near the top of the
31# script. There are also a number of Perl module API 'bindings' generated by
32# the Thrift compilation process... currently located within the packages of
33# the Parallel Processing extension. Note that I make use of some tie() magic
34# so as to provide calling code with 'file handle'-like objects to interact
35# with (print, readline etc), so that is pretty cool.
36
37package FileUtils::HDThriftFS;
38
39# Pragma
40use strict;
41
42# Setup Environment
43BEGIN
44{
45 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
46 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
47 die "PERL_VERSION not set\n" unless defined $ENV{'PERL_VERSION'};
48 # Bit::Vector and Thrift modules
49 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/' . $ENV{'GSDLOS'} . '/lib/perl/' . $ENV{'PERL_VERSION'});
50 # ThriftFS Perl API
51 unshift (@INC, $ENV{'GEXTPARALLELBUILDING'} . '/packages/hadoop-1.1.0/contrib/thriftfs/gen-perl');
52}
53
54# Modules - Core
55use Devel::Peek;
56use MIME::Base64;
57use POSIX qw(floor);
58use Symbol;
59use Thrift::Socket;
60use Thrift::BufferedTransport;
61use Thrift::BinaryProtocol;
62# Modules - Thrift
63use HadoopFS::FileSystem;
64# Modules - Greenstone
65use FileUtils::HDThriftFS::ThriftFH;
66use MIME::Base91;
67
68# Configuration
69my $host = "localhost";
70my $port = 58660;
[27424]71my $debug = 0;
[27386]72my $debug_encoding = 0;
73
74# Globals
75my $transport;
76my $thrift_client;
77
78
79## @function END()
80#
81# Ensure the transport layer, if open, is properly closed
82#
83END
84{
85 if (defined $transport)
86 {
87 $transport->close();
88 }
89}
90## END()
91
92
93## @function _establishClient()
94#
95sub _establishClient
96{
97 if (!defined $thrift_client)
98 {
99 ###rint "Create Thrift Client to $host:$port\n";
100 my $socket = Thrift::Socket->new($host, $port);
101 $socket->setSendTimeout(10000);
102 $socket->setRecvTimeout(20000);
103
104 $transport = Thrift::BufferedTransport->new($socket);
105 my $protocol = Thrift::BinaryProtocol->new($transport);
106 $thrift_client = HadoopFS::FileSystemClient->new($protocol);
107
108 eval { $transport->open(); };
109 if ($@)
110 {
[27424]111 &FileUtils::printError('Unable to connect: ' . $@->{message}, 1);
[27386]112 }
113 }
114}
115## _establishClient()
116
117
118## @function _generateHDFSPath()
119#
120sub _generateHDFSPath
121{
122 my ($path) = @_;
123 if (ref($path) ne 'HadoopFS::Pathname')
124 {
125 if ($path !~ /HDThriftFS:\/\//)
126 {
127 &_printError('Not a valid thrift URI: ' . $path);
128 }
129 else
130 {
131 $path = HadoopFS::Pathname->new( { pathname => substr($path, 13) } );
132 }
133 }
134 return $path;
135}
136## _generateHDFSPath()
137
138
[27424]139## @function _printDebug()
140#
141sub _printDebug
142{
143 my ($msg) = @_;
144 if ($debug)
145 {
146 my ($package, $filename, $line, $function) = caller(1);
147 print STDERR '[DEBUG] ' . $function . ': ' . $msg . "\n";
148 }
149}
150## _printDebug()
151
[27386]152################################################################################
153################################################################################
154################################################################################
155
156
[27424]157## @function canRead()
158#
159sub canRead
160{
161 my $path = shift(@_);
162 return &checkPermission($path, 'r', @_);
163}
164## canRead()
165
166
167## @function checkPermission()
168#
169sub checkPermission
170{
171 my ($path, $mode, $username, $usergroup) = @_;
172 my $offsets = {'r' => 0, 'w' => 1, 'x' => 2};
173 # - ensure we have a connection to the thrift server
174 &_establishClient();
175 # - convert the path into a proper thrift path object
176 $path = &_generateHDFSPath($path);
177 # - determine the user (defaults to current user)
178 if (!defined $username)
179 {
180 if ($ENV{'GSDLOS'} =~ /^windows$/i)
181 {
182 require Win32;
183 $username = Win32::LoginName();
184 }
185 else
186 {
187 $username = getlogin || getpwuid($<);
188 }
189 }
190 # - determine the group
191 my $usergroups = {};
192 if (defined $usergroup)
193 {
194 $usergroups = (ref $usergroup eq "HASH") ? $usergroup : {$usergroup => 1};
195 }
196 else
197 {
198 if ($ENV{'GSDLOS'} =~ /^windows$/i)
199 {
200 # dunno
201 }
202 else
203 {
204 my $raw_groups = `groups`;
205 foreach my $group ( split(/\s/, $raw_groups))
206 {
207 $usergroups->{$group} = 1;
208 }
209 }
210 }
211 # Retrieve details from the file
212 my $file_stat = $thrift_client->stat($path);
213 my $owner = $file_stat->{'owner'};
214 my $group = $file_stat->{'group'};
215 my $permissions = $file_stat->{'permission'};
216 # Begin the cascade of tests to determine if the identified user belonging to
217 # the identified group can perform 'mode' access to the file.
218 my $has_permission = 0;
219 # - start with [u]ser permission
220 if (defined $owner && $username eq $owner)
221 {
222 my $target_char = substr($permissions, $offsets->{$mode}, 1);
223 if ($mode eq $target_char)
224 {
225 $has_permission = 1;
226 }
227 }
228 # - failing that, try [g]roup level permissions
229 if (!$has_permission && defined $group && defined $usergroups->{$group})
230 {
231 my $target_char = substr($permissions, 3 + $offsets->{$mode}, 1);
232 if ($mode eq $target_char)
233 {
234 $has_permission = 1;
235 }
236 }
237 # - and finally try [o]ther level permission
238 if (!$has_permission)
239 {
240 my $target_char = substr($permissions, 6 + $offsets->{$mode}, 1);
241 if ($mode eq $target_char)
242 {
243 $has_permission = 1;
244 }
245 }
246 return $has_permission;
247}
248## checkPermission
249
250
[27386]251## @function closeFileHandle()
252#
253sub closeFileHandle
254{
255 my $fh_ref = shift(@_);
[27424]256 &_printDebug('');
[27386]257 close($$fh_ref);
258 untie($$fh_ref);
259 return 1;
260}
261## closeFileHandle()
262
263
264## @function fileSize()
265#
266sub fileSize
267{
268 my ($path, $test_op) = @_;
269 # ensure we have a connection to the thrift server
270 &_establishClient();
271 # - convert the path into a proper thrift path object
272 $path = &_generateHDFSPath($path);
273 my $file_stat = $thrift_client->stat($path);
274 return $file_stat->{length};
275}
276## fileSize()
277
278
279## @function fileTest()
280#
281sub fileTest
282{
283 my ($raw_path, $test_op) = @_;
284 my $result = 0;
285 # ensure we have a connection to the thrift server
286 &_establishClient();
287 # - convert the path into a proper thrift path object
288 my $path = &_generateHDFSPath($raw_path);
289 # note: symbolic linking not supported within HDFS
290 if (!defined $test_op || $test_op eq '-l')
291 {
292 $test_op = '-e';
293 }
294 if ($test_op eq '-d')
295 {
296 if ($thrift_client->exists($path))
297 {
298 my $file = $thrift_client->stat($path);
299 if ($file->{'isdir'})
300 {
301 $result = 1;
302 }
303 }
304 }
305 elsif ($test_op eq '-e')
306 {
307 if ($thrift_client->exists($path))
308 {
309 $result = 1;
310 }
311 }
312 elsif ($test_op eq '-f')
313 {
314 if ($thrift_client->exists($path))
315 {
316 my $file = $thrift_client->stat($path);
317 if (!$file->{'isdir'})
318 {
319 $result = 1;
320 }
321 }
322 }
323 else
324 {
325 &FileUtils::printError('Unknown or unsupported test mode: ' . $test_op);
326 }
327 return $result;
328}
329## fileTest()
330
331
332## @function filenameConcatenate()
333#
334sub filenameConcatenate
335{
336 my $protocol = shift(@_);
337 my $filename = join('/', @_);
338 # remove repeated slashes
339 $filename =~ s/[\/]+/\//g;
340 # append protocol (which may cause multiple slashes)
341 $filename = $protocol . '/' . $filename;
342 # strip any trailing slashes
343 $filename =~ s/[\\\/]$//;
344 return $filename;
345}
346## filenameConcatenate()
347
348
[27424]349## @function isFilenameAbsolute()
350#
351sub isFilenameAbsolute
352{
353 # File paths against HDFS must be.
354 return 1;
355}
356# isFilenameAbsolute()
357
358
[27386]359## @function makeDirectory()
360#
361sub makeDirectory
362{
363 my ($raw_path) = @_;
364 my $result = 0;
365 # ensure we have a connection to the thrift server
366 &_establishClient();
367 # - convert the path into a proper thrift path object
368 my $path = &_generateHDFSPath($raw_path);
369 if (!&fileTest($path, '-d'))
370 {
371 # - create the directory
372 $thrift_client->mkdirs($path);
373 }
374 # - check that it exists
375 return (&fileTest($path, '-d'));
376}
377## makeDirectory()
378
379
380## @function modificationTime()
381#
382sub modificationTime
383{
384 my ($path) = @_;
385 # ensure we have a connection to the thrift server
386 &_establishClient();
387 # - convert the path into a proper thrift path object
388 $path = &_generateHDFSPath($path);
389 my $file_stat = $thrift_client->stat($path);
390 return floor($file_stat->{modificationTime} / 1000);
391}
392## modificationTime()
393
394
395## @function openFileHandle()
396#
397sub openFileHandle
398{
399 my ($raw_path, $mode, $fh_ref) = @_;
[27424]400 &_printDebug('path: ' . $raw_path . ', mode: ' . $mode . ', fh_ref');
[27386]401 # ensure we have a connection to the thrift server
402 &_establishClient();
403 #rint STDERR "DEBUG: openFileHandle($raw_path, $mode, fh_ref)\n";
404 my $path = &_generateHDFSPath($raw_path);
405 my $fh = gensym();
406 tie(*$fh, "FileUtils::HDThriftFS::ThriftFH", $thrift_client);
407 open($fh, $path, $mode) or die("Failed to open thriftfs");
408 $$fh_ref = $fh;
409 return 1;
410}
411## openFileHandle()
412
413
414## @function readDirectory()
415#
416sub readDirectory
417{
418 my ($raw_path) = @_;
419 my @files;
420 # ensure we have a connection to the thrift server
421 &_establishClient();
422 my $path = &_generateHDFSPath($raw_path);
423 my $raw_files = $thrift_client->listStatus($path);
424 if ($raw_files && @{$raw_files} > 0)
425 {
426 foreach my $file_stat (@{$raw_files})
427 {
428 my $file_path = $file_stat->{'path'};
429 my ($filename) = $file_path =~ /([^\\\/]+)$/;
430 push(@files, $filename);
431 }
432 }
433 return \@files;
434}
435## readDirectory()
436
437
438## @function removeFiles()
439#
440sub removeFiles
441{
442 my ($path, $recursive) = @_;
443 my $result = 0;
444 if (!defined $recursive)
445 {
446 $recursive = 0;
447 }
448 # ensure we have a connection to the thrift server
449 &_establishClient();
450 # - convert the path into a proper thrift path object as necessary
451 $path = &_generateHDFSPath($path);
452 if ($thrift_client->exists($path) && ($recursive || &fileTest($path, '-f')))
453 {
454 $thrift_client->rm($path, $recursive);
455 $result = !$thrift_client->exists($path);
456 }
457 return $result;
458}
459## removeFiles()
460
461
462## @function removeFilesFiltered()
463#
464sub removeFilesFiltered
465{
466 my ($paths, $accept_re, $reject_re) = @_;
467 # ensure we have a connection to the thrift server
468 &_establishClient();
469 # Perform a depth first, recursive, removal of files and directories that
470 # match the given accept and reject patterns
471 my @paths_array = (ref $paths eq "ARRAY") ? @$paths : ($paths);
472 my $num_removed = 0;
473 foreach my $raw_path (@paths_array)
474 {
475 # remove trailing slashes
476 $raw_path =~ s/[\/\\]+$//;
477 my $path = &_generateHDFSPath($raw_path);
478 if (!$thrift_client->exists($path))
479 {
480 print STDERR "HDThriftFS::removeFilesFiltered() path does not exist: " . $raw_path . "\n";
481 }
482 elsif (&fileTest($path, '-d'))
483 {
484 my @files = @{&readDirectory($path)};
485 foreach my $file (@files)
486 {
487 my $child_path = $raw_path . '/' . $file;
488 $num_removed += &removeFilesFiltered($child_path, $accept_re, $reject_re);
489 }
490 if (!defined $accept_re && !defined $reject_re)
491 {
492 # remove this directory - non-recursively so that the command fails
493 # if there are (somehow) still files contained within
494 $thrift_client->rm($path, 0);
495 if ($thrift_client->exists($path))
496 {
497 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove directory: " . $raw_path . "\n";
498 }
499 else
500 {
501 $num_removed++;
502 }
503 }
504 }
505 else
506 {
507 if (defined $reject_re && ($raw_path =~ m/$reject_re/))
508 {
509 next;
510 }
511 if ((!defined $accept_re) || ($raw_path =~ m/$accept_re/))
512 {
513 # remove this file
514 $thrift_client->rm($path, 0);
515 if ($thrift_client->exists($path))
516 {
517 print STDERR "HDThriftFS::removeFilesFiltered() couldn't remove file: " . $raw_path . "\n";
518 }
519 else
520 {
521 $num_removed++;
522 }
523 }
524 }
525 }
526 return $num_removed;
527}
528## removeFilesFiltered()
529
530
531## @function removeFilesRecursive()
532#
533sub removeFilesRecursive
534{
535 my ($path) = @_;
536 # use the more general removeFilesFiltered() function with no accept or reject
537 # expressions
538 return &removeFilesFiltered($path, undef, undef);
539}
540## removeFilesRecursive()
541
542
543## @function supportsSymbolicLink
544#
545sub supportsSymbolicLink
546{
547 return 0;
548}
549## supportsSymbolicLink()
550
551
552## @function transferFile()
553#
554sub transferFile
555{
556 my ($mode, $src, $dst) = @_;
557 # ensure we have a connection to the thrift server
558 &_establishClient();
559 #rint STDERR "transferFile($mode, $src, $dst)\n";
560 my $src_path = &_generateHDFSPath($src);
561 my $dst_path = &_generateHDFSPath($dst);
562 if (&fileTest($dst_path, '-d'))
563 {
564 my ($filename) = $src =~ /([^\\\/]+)$/;
565 $dst .= '/' . $filename;
566 $dst_path = &_generateHDFSPath($dst);
567 }
568 if (!$thrift_client->exists($src_path))
569 {
570 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
571 return 0;
572 }
573 if ($thrift_client->exists($dst_path))
574 {
575 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
576 return 0;
577 }
578 # what happens next depends on the mode, and is either very easy or really
579 # hard
580 if ($mode eq 'MOVE')
581 {
582 $thrift_client->rename($src_path, $dst_path);
583 }
584 elsif ($mode eq 'COPY')
585 {
586 # Open the src file for reading
587 #rint STDERR "DEBUG: FHIN opened (should be 'r'): $src\n";
588 my $fhin = $thrift_client->open($src_path);
589 # Create the dst file for writing
590 #rint STDERR "DEBUG: FHOUT created (should be 'w'): $dst\n";
591 my $fhout = $thrift_client->create($dst_path);
592 # Read all of src file writing to dst file
593 # - this is where things have the potential to go wrong, as it doesn't seem
594 # thrift supports writing bytes
595 # - only strings. May need to see if I can make Perl behave using black
596 # magic flags (marking string as binary etc) It'll work fine for text
597 # files though
598 my $data = undef;
599 my $offset = 0;
600 my $length = 4096;
601 # Read 4K blocks at a time
602 while ($data = $thrift_client->read($fhin, $offset, $length))
603 {
604 $thrift_client->write($fhout, $data);
605 $offset += $length;
606 if (length ($data) < $length)
607 {
608 last;
609 }
610 }
611 # Close files
612 $thrift_client->close($fhout);
613 $thrift_client->close($fhin);
614 }
615 my $result = ($thrift_client->exists($dst_path));
616 #rint STDERR "transferFile() => $result\n";
617 return $result;
618}
619## transferFile()
620
621
622## @function transferFileFromLocal()
623#
624sub transferFileFromLocal
625{
626 my ($mode, $src, $dst) = @_;
627 # ensure we have a connection to the thrift server
628 &_establishClient();
629 # destination is remote
630 my $dst_path = &_generateHDFSPath($dst);
631 if (&fileTest($dst_path, '-d'))
632 {
633 my ($filename) = $src =~ /([^\\\/]+)$/;
634 $dst .= '/' . $filename;
635 $dst_path = &_generateHDFSPath($dst);
636 }
637 # can't replace - if the file already exists
638 if (&fileTest($dst_path, '-f'))
639 {
640 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
641 return 0;
642 }
643 # copy the file
644 my $fhin;
645 open($fhin, '<:raw', $src) or die("Failed to open file for reading: " . $src . " (" . $! . ")");
646 my $decoded = '';
647 my $fhout = $thrift_client->create($dst_path);
648 while (read($fhin, $decoded, 4096))
649 {
650 if ($debug_encoding)
651 {
652 print STDERR "Writing Data: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
653 }
654 # Base64 encode to protect binary
655 #my $encoded = encode_base64($decoded);
656 # Base91 encode to protect binary - we add a Byte Order Marker so the
657 # Thrift Server can detect the need to decode the string sent
658 my $encoded = MIME::Base91::encode($decoded);
659 if ($debug_encoding)
660 {
661 print STDERR "Encoded: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
662 }
663 $thrift_client->write($fhout, $encoded);
664 }
665 close($fhin);
666 $thrift_client->close($fhout);
667 # in general, the transfer has worked if the destination file exists
668 my $result = $thrift_client->exists($dst_path);
669 # if moving, remove the source file from the local filesystem
670 if ($mode eq 'MOVE')
671 {
672 unlink($src);
673 # update result to reflect if we successfully removed the src file
674 $result = $result && (!-f $src);
675 }
676 return $result
677}
678## transferFileFromLocal()
679
680
681## @function transferFileToLocal()
682#
683sub transferFileToLocal
684{
685 my ($mode, $src, $dst) = @_;
686 # ensure we have a connection to the thrift server
687 &_establishClient();
688 # source is remote
689 my $src_path = &_generateHDFSPath($src);
690 if (!$thrift_client->exists($src_path))
691 {
692 &FileUtils::printError('Source file (during ' . $mode . ') does not exist: ' . $src);
693 return 0;
694 }
695 if (-d $dst)
696 {
697 my ($filename) = $src =~ /([^\\\/]+)$/;
698 $dst .= '/' . $filename;
699 }
700 if (-e $dst)
701 {
702 &FileUtils::printError('Destination file (during ' . $mode . ') already exists: ' . $dst);
703 return 0;
704 }
705 # open local file
706 my $fhout;
707 my $encoded = undef;
708 my $offset = 0;
709 my $length = 4096; # Read 4K blocks at a time
710 open($fhout, '>:raw', $dst) or die("Failed to open file for writing: " . $dst);
711 my $fhin = $thrift_client->open($src_path);
712 while ($encoded = $thrift_client->read($fhin, $offset, $length))
713 {
714 if ($debug_encoding)
715 {
716 print STDERR "Reading Data: \n=== START ===\n"; Dump($encoded); print STDERR "\n=== END ===\n\n";
717 }
718 #my $decoded = decode_base64($encoded);
719 my $decoded = MIME::Base91::decode($encoded);
720 if ($debug_encoding)
721 {
722 print STDERR "Decoded: \n=== START ===\n"; Dump($decoded); print STDERR "\n=== END ===\n\n";
723 }
724 print $fhout $decoded;
725 last if (length ($encoded) < $length);
726 $offset += $length;
727 }
728 close($fhout);
729 $thrift_client->close($fhin);
730 # in general, the transfer has worked if the destination file exists
731 my $result = (-f $dst);
732 # if moving, remove the source file from the HDFS filesystem
733 if ($mode eq 'MOVE')
734 {
735 $thrift_client->rm($src_path, 0);
736 # update result to reflect if we successfully removed the src file
737 $result = $result && !$thrift_client->exists($src_path);
738 }
739 return $result;
740}
741## transferFileToLocal()
742
743
7441;
Note: See TracBrowser for help on using the repository browser.