############################################################################### # # parallelbuildinginexport.pm -- support parallel import.pl by extending # inexport with parallel processing awareness and functionality # # A component of the Greenstone digital library software from the New Zealand # Digital Library Project at the University of Waikato, New Zealand. # # Copyright (C) 2013 New Zealand Digital Library Project # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 675 Mass # Ave, Cambridge, MA 02139, USA. # ############################################################################### # @author Hussein Suleman [h1], University of Cape Town, South Africa # @author John Thompson [jmt12], Greenstone DL Research Group package parallelbuildinginexport; # Pragma use strict; no strict 'refs'; # allow filehandles to be variables and vice versa no strict 'subs'; # allow barewords (eg STDERR) as function arguments # MODULES # Randomize the order of files in the filelist use List::Util qw( shuffle ); use Time::HiRes qw( gettimeofday tv_interval ); # Greenstone Modules use gsprintf 'gsprintf'; use inexport; our $start_time; BEGIN { @parallelbuildinginexport::ISA = ('inexport'); } END { if (defined $start_time) { my $end_time = [&gettimeofday()]; my $duration = tv_interval($start_time, $end_time); print &makeHeader('Parallel Import Complete') . "\n"; print ' Ended: ' . @{$end_time}[0] . '.' . @{$end_time}[1] . "\n"; print ' Duration: ' . sprintf('%0.6f', $duration) . "\n"; print '=' x 80 . "\n"; } } # jobs and epoch added for parallel processing [hs, 1 july 2010] # added aliases 'workers' and 'batchsize' [jmt12] my $arguments = [ { 'name' => "workers", 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing", 'type' => "int", 'range' => "0,", 'reqd' => "no", 'hiddengli' => "yes" }, { 'name' => "batchsize", 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing", 'type' => "int", 'range' => "1,", 'reqd' => "no", 'hiddengli' => "yes" }, { 'name' => "nolocal", 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only", 'type' => "flag", 'reqd' => "no", 'hiddengli' => "yes" }, { 'name' => "jobs", 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1", 'type' => "int", 'range' => "1,", 'reqd' => "no", 'internal' => "1", 'hiddengli' => "yes" }, { 'name' => "epoch", 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above", 'type' => "int", 'range' => "1,", 'reqd' => "no", 'internal' => "1", 'hiddengli' => "yes" }, ]; # @function new() # Constructor # sub new { my $class = shift(@_); my $self = new inexport(@_); $start_time = [&gettimeofday()]; # Legacy support - Dr Suleman initially had different names for these # arguments, namely jobs and epoch if ($self->{'workers'} eq '' && $self->{'jobs'} ne '') { $self->{'workers'} = $self->{'jobs'} - 1; } if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '') { $self->{'batchsize'} = $self->{'epoch'}; } # Sanity Check if ($self->{'batchsize'} !~ /^\d+$/) { print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n"; $self->{'batchsize'} = 1; } if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1) { print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n"; $self->{'workers'} = 0; } else { my $message = 'Performing Parallel Import'; print &makeHeader($message) . "\n"; print ' Started: ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n"; print ' Workers: ' . $self->{'workers'} . "\n"; print ' Batchsize: ' . $self->{'batchsize'} . "\n"; print '=' x 80 . "\n"; if (!$self->{'removeold'}) { print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n"; sleep(3); #just in case $self->{'removeold'} = 1; } } return bless($self, $class); } # new() ## @function # sub set_collection_options { my $self = shift(@_); my ($collectcfg) = @_; $self->SUPER::set_collection_options($collectcfg); $self->{'collectcfg'} = $collectcfg; } ## set_collection_options() ## ## @function deinit() # @function _farmOutProcesses() # Index the files in parallel using MPI farmer to farm off multiple processes # @author hs, 1 july 2010 # sub _farmOutProcesses { my $self = shift(@_); my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_; my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp'); if (!-d $tmp_dir_path) { mkdir($tmp_dir_path, 0777); } # create the list of files to import my $overwrite = 1; my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt"); # - if the file is already there (which is should be during testing) then # don't regenerate. This is especially important for imports of 1 million # documents as just the directory scan can take several hours. if ($overwrite || !-f $tmp_filelist) { open (my $filelist, ">$tmp_filelist"); my @filenames = keys %{$block_hash->{'all_files'}}; @filenames = shuffle(@filenames); foreach my $filename (@filenames) { my $full_filename = &util::filename_cat($importdir,$filename); if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/)) { print $filelist "$filename\n"; } } close ($filelist); } # Determine if we've been provided a mpi.conf file to indicate the other # machines (slave nodes) this parallizable process should run on my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf'); my $mpi_flags = ''; if (-f $mpi_conf_path) { print STDERR " ***** CLUSTER MODE *****\n"; $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" '; $mpi_flags .= '-nolocal '; } else { print STDERR " ***** SINGLE COMPUTER MODE *****\n"; } $mpi_flags .= ' --show-progress --timestamp-output --verbose'; # fix for mpi binding to incorrect interface device (seems to have an # unhealthy obsession with virbr0) $mpi_flags .= ' --mca btl tcp,sm,self --mca btl_tcp_if_include eth0 '; # invoke the farmer to start processing the files my $gsdlhome; if (defined $site && $site ne '') { $gsdlhome = $ENV{'GSDL3HOME'}; } else { $site = ""; $gsdlhome = $ENV{'GSDLHOME'}; } # commands now assume path is correct to find this executables (as they # will be under the new extensions framework) my $farmer_exe = $gsdlhome . '/ext/parallel-building/' . $ENV{'GSDLOS'} . '/bin/mpiimport'; my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site; print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n"; open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI"); while ( defined( my $line = ) ) { chomp($line); print "$line\n"; } close(MPI); } # _farmOutProcesses() ## @function getSupportedArguments() # # Retrieve the list of arguments that are specific to this subclass of inexport # so they can be added to the list of supported arguments to import.pl. The use # of any of these arguments automatically causes this subclass to be # instantiated and used in preference to the parent class. ATM it is up to the # implementer to ensure these arguments are unique. # sub getSupportedArguments { return $arguments; } ## getSupportedArguments() ## ################################################################################ ##### Overrides ################################################################################ ## @function perform_process_files() # sub perform_process_files { my $self = shift(@_); my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_; my $batchsize = $self->{'batchsize'}; my $collection = $self->{'collection'}; my $site = $self->{'site'}; my $workers = $self->{'workers'}; # Parallel Import if ($workers > 0) { # Call the function written by HS $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site); } # Serial import else { $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs); } # the individual parts into one single database my $infodb_type = $self->{'collectcfg'}->{'infodbtype'}; my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf'); if ( -f $mpi_conf_path && &dbutil::supportsMerge($infodb_type) ) { print STDERR ' * Merging ' . $infodb_type . ' databases... '; my @databases = ('archiveinf-src','archiveinf-doc'); foreach my $database (@databases) { # generate the path to the target database without any hostname suffix my $archive_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, ''); #rint STDERR " - merging to: " . $archive_db_path . "\n"; open(MPIHOSTS, '<:utf8', $mpi_conf_path) or die("Failed to open mpi.conf for reading"); my $line; while ($line = ) { if ($line =~ /^([a-z0-9\-]+)/i) { my $hostname = $1; ###rint STDERR " - searching for database for: " . $hostname . "\n"; my $mergable_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, $hostname); ###rint STDERR "[debug] " . $mergable_db_path . "\n"; if (-f $mergable_db_path) { ###rint STDERR " - merge $mergable_db_path > $archive_db_path\n"; &dbutil::merge_databases($infodb_type, $mergable_db_path, $archive_db_path); } } } close(MPIHOSTS); } print "Done!\n"; } } ## perform_process_files() ## ## @function generate_statistics() # # Write out import stats - only output statistics if there are multiple jobs # @author hs, 1 july 2010 # sub generate_statistics { my $self = shift @_; my ($pluginfo) = @_; my $inexport_mode = $self->{'mode'}; my $statsfile = $self->{'statsfile'}; my $out = $self->{'out'}; my $faillogname = $self->{'faillogname'}; my $gli = $self->{'gli'}; my $close_stats = 0; # Child processes should have been sent to file, only the parent process # should be writing to STD*. if ($statsfile !~ /^(STDERR|STDOUT)$/i) { if (open (STATS, ">$statsfile")) { $statsfile = 'inexport::STATS'; $close_stats = 1; } else { &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile); &gsprintf($out, "{import.stats_backup}\n"); $statsfile = 'STDERR'; } } # Master thread. In the future I should do something smarter here, like read # in the stats from all the worker threads stat files, add them up, then # create a dummy plugin object and add it to pluginfo. Otherwise the results # always show 0 documents considered and 0 included... else { } &gsprintf($out, "\n"); &gsprintf($out, "*********************************************\n"); &gsprintf($out, "{$inexport_mode.complete}\n"); &gsprintf($out, "*********************************************\n"); # ... but for now just comment this out #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli); &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n"); if ($close_stats) { close STATS; } } ## generate_statistics() ## ## @function makeHeader($msg, [$length]) # # Create a centered header string given a certain message padded with '=' characters. # # @param $msg The message to center as a string # @param $length The desired length of string - defaults to 80 # @return A string centered with '=' as padding # sub makeHeader { my ($msg, $length) = @_; if (!defined $length) { $length = 80; } my $filler_length = ($length - 2 - length($msg)) / 2; my $filler = '=' x $filler_length; if (length($msg) % 2 == 1) { $msg = $filler . ' ' . $msg . ' =' . $filler; } else { $msg = $filler . ' ' . $msg . ' ' . $filler; } return $msg; } ## makeHeader() ## 1;