root/gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm @ 29257

Revision 29257, 13.4 KB (checked in by jmt12, 5 years ago)

Allow for collection configuration to be passed down to parallel import proc (so we can get infodbtype), and support infodbtypes that require post-import merging(such as TDBCluster

Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the  GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40use Time::HiRes  qw( gettimeofday tv_interval );
41
42# Greenstone Modules
43use gsprintf 'gsprintf';
44use inexport;
45
46our $start_time;
47
48BEGIN
49{
50  @parallelbuildinginexport::ISA = ('inexport');
51}
52
53END
54{
55  if (defined $start_time)
56  {
57    my $end_time = [&gettimeofday()];
58    my $duration = tv_interval($start_time, $end_time);
59    print &makeHeader('Parallel Import Complete') . "\n";
60    print '  Ended:    ' . @{$end_time}[0] . '.' . @{$end_time}[1] . "\n";
61    print '  Duration: ' . sprintf('%0.6f', $duration) . "\n";
62    print '=' x 80 . "\n";
63  }
64}
65
66# jobs and epoch added for parallel processing [hs, 1 july 2010]
67# added aliases 'workers' and 'batchsize' [jmt12]
68my $arguments = [
69                 { 'name' => "workers",
70                   'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
71                   'type' => "int",
72                   'range' => "0,",
73                   'reqd' => "no",
74                   'hiddengli' => "yes" },
75                 { 'name' => "batchsize",
76                   'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
77                   'type' => "int",
78                   'range' => "1,",
79                   'reqd' => "no",
80                   'hiddengli' => "yes" },
81                 { 'name' => "nolocal",
82                   'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
83                   'type' => "flag",
84                   'reqd' => "no",
85                   'hiddengli' => "yes" },
86                 { 'name' => "jobs",
87                   'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
88                   'type' => "int",
89                   'range' => "1,",
90                   'reqd' => "no",
91                   'internal' => "1",
92                   'hiddengli' => "yes" },
93                 { 'name' => "epoch",
94                   'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
95                   'type' => "int",
96                   'range' => "1,",
97                   'reqd' => "no",
98                   'internal' => "1",
99                   'hiddengli' => "yes" },
100                ];
101
102
103# @function new()
104# Constructor
105#
106sub new
107{
108  my $class = shift(@_);
109  my $self = new inexport(@_);
110
111  $start_time = [&gettimeofday()];
112
113  # Legacy support - Dr Suleman initially had different names for these
114  # arguments, namely jobs and epoch
115  if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
116  {
117    $self->{'workers'} = $self->{'jobs'} - 1;
118  }
119  if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
120  {
121    $self->{'batchsize'} = $self->{'epoch'};
122  }
123
124  # Sanity Check
125  if ($self->{'batchsize'} !~ /^\d+$/)
126  {
127    print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n";
128    $self->{'batchsize'} = 1;
129  }
130  if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
131  {
132    print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n";
133    $self->{'workers'} = 0;
134  }
135  else
136  {
137    my $message = 'Performing Parallel Import';
138    print &makeHeader($message) . "\n";
139    print '  Started:   ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n";
140    print '  Workers:   ' . $self->{'workers'} . "\n";
141    print '  Batchsize: ' . $self->{'batchsize'} . "\n";
142    print '=' x 80 . "\n";
143    if (!$self->{'removeold'})
144    {
145      print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
146      sleep(3); #just in case
147      $self->{'removeold'} = 1;
148    }
149  }
150
151  return bless($self, $class);
152}
153# new()
154
155
156## @function
157#
158sub set_collection_options
159{
160  my $self = shift(@_);
161  my ($collectcfg) = @_;
162  $self->SUPER::set_collection_options($collectcfg);
163  $self->{'collectcfg'} = $collectcfg;
164}
165## set_collection_options() ##
166
167
168## @function deinit()
169
170
171# @function _farmOutProcesses()
172# Index the files in parallel using MPI farmer to farm off multiple processes
173# @author hs, 1 july 2010
174#
175sub _farmOutProcesses
176{
177  my $self = shift(@_);
178  my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
179
180  my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
181  if (!-d $tmp_dir_path)
182  {
183    mkdir($tmp_dir_path, 0777);
184  }
185
186  # create the list of files to import
187  my $overwrite = 1;
188  my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
189  # - if the file is already there (which is should be during testing) then
190  #   don't regenerate. This is especially important for imports of 1 million
191  #   documents as just the directory scan can take several hours.
192  if ($overwrite || !-f $tmp_filelist)
193  {
194    open (my $filelist, ">$tmp_filelist");
195    my @filenames = keys %{$block_hash->{'all_files'}};
196    @filenames = shuffle(@filenames);
197    foreach my $filename (@filenames)
198    {
199      my $full_filename = &util::filename_cat($importdir,$filename);
200      if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
201      {
202        print $filelist "$filename\n";
203      }
204    }
205    close ($filelist);
206  }
207
208  # Determine if we've been provided a mpi.conf file to indicate the other
209  # machines (slave nodes) this parallizable process should run on
210  my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
211  my $mpi_flags = '';
212  if (-f $mpi_conf_path)
213  {
214    print STDERR " ***** CLUSTER MODE *****\n";
215    $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
216    $mpi_flags .= '-nolocal ';
217  }
218  else
219  {
220    print STDERR " ***** SINGLE COMPUTER MODE *****\n";
221  }
222  $mpi_flags .= ' --show-progress --timestamp-output --verbose';
223  # fix for mpi binding to incorrect interface device (seems to have an
224  # unhealthy obsession with virbr0)
225  $mpi_flags .= ' --mca btl tcp,sm,self --mca btl_tcp_if_include eth0 ';
226
227  # invoke the farmer to start processing the files
228  my $gsdlhome;
229  if (defined $site && $site ne '')
230  {
231    $gsdlhome = $ENV{'GSDL3HOME'};
232  }
233  else
234  {
235    $site = "";
236    $gsdlhome = $ENV{'GSDLHOME'};
237  }
238
239  # commands now assume path is correct to find this executables (as they
240  # will be under the new extensions framework)
241  my $farmer_exe = $gsdlhome . '/ext/parallel-building/' . $ENV{'GSDLOS'} . '/bin/mpiimport';
242
243  my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
244  print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
245
246  open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
247  while ( defined( my $line = <MPI> )  )
248  {
249    chomp($line);
250    print "$line\n";
251  }
252  close(MPI);
253}
254# _farmOutProcesses()
255
256
257## @function getSupportedArguments()
258#
259# Retrieve the list of arguments that are specific to this subclass of inexport
260# so they can be added to the list of supported arguments to import.pl. The use
261# of any of these arguments automatically causes this subclass to be
262# instantiated and used in preference to the parent class. ATM it is up to the
263# implementer to ensure these arguments are unique.
264#
265sub getSupportedArguments
266{
267  return $arguments;
268}
269## getSupportedArguments() ##
270
271
272################################################################################
273##### Overrides
274################################################################################
275
276
277## @function perform_process_files()
278#
279sub perform_process_files
280{
281  my $self = shift(@_);
282  my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
283  my $batchsize = $self->{'batchsize'};
284  my $collection = $self->{'collection'};
285  my $site = $self->{'site'};
286  my $workers = $self->{'workers'};
287
288  # Parallel Import
289  if ($workers > 0)
290  {
291    # Call the function written by HS
292    $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
293  }
294  # Serial import
295  else
296  {
297    $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
298  }
299
300  # the individual parts into one single database
301  my $infodb_type = $self->{'collectcfg'}->{'infodbtype'};
302  my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
303  if ( -f $mpi_conf_path && &dbutil::supportsMerge($infodb_type) )
304  {
305    print STDERR ' * Merging ' . $infodb_type . ' databases... ';
306    my @databases = ('archiveinf-src','archiveinf-doc');
307    foreach my $database (@databases)
308    {
309      # generate the path to the target database without any hostname suffix
310      my $archive_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, '');
311      #rint STDERR " - merging to: " . $archive_db_path . "\n";
312      open(MPIHOSTS, '<:utf8', $mpi_conf_path) or die("Failed to open mpi.conf for reading");
313      my $line;
314      while ($line = <MPIHOSTS>)
315      {
316        if ($line =~ /^([a-z0-9\-]+)/i)
317        {
318          my $hostname = $1;
319          ###rint STDERR " - searching for database for: " . $hostname . "\n";
320          my $mergable_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, $hostname);
321          ###rint STDERR "[debug] " . $mergable_db_path . "\n";
322          if (-f $mergable_db_path)
323          {
324            ###rint STDERR " - merge $mergable_db_path > $archive_db_path\n";
325            &dbutil::merge_databases($infodb_type, $mergable_db_path, $archive_db_path);
326          }
327        }
328      }
329      close(MPIHOSTS);
330    }
331    print "Done!\n";
332  }
333
334}
335## perform_process_files() ##
336
337## @function generate_statistics()
338#
339# Write out import stats - only output statistics if there are multiple jobs
340# @author hs, 1 july 2010
341#
342sub generate_statistics
343{
344  my $self = shift @_;
345  my ($pluginfo) = @_;
346
347  my $inexport_mode = $self->{'mode'};
348
349  my $statsfile   = $self->{'statsfile'};
350  my $out         = $self->{'out'};
351  my $faillogname = $self->{'faillogname'};
352  my $gli         = $self->{'gli'};
353
354  my $close_stats = 0;
355  # Child processes should have been sent to file, only the parent process
356  # should be writing to STD*.
357  if ($statsfile !~ /^(STDERR|STDOUT)$/i)
358  {
359    if (open (STATS, ">$statsfile"))
360    {
361      $statsfile = 'inexport::STATS';
362      $close_stats = 1;
363    }
364    else
365    {
366      &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
367      &gsprintf($out, "{import.stats_backup}\n");
368      $statsfile = 'STDERR';
369    }
370  }
371  # Master thread. In the future I should do something smarter here, like read
372  # in the stats from all the worker threads stat files, add them up, then
373  # create a dummy plugin object and add it to pluginfo. Otherwise the results
374  # always show 0 documents considered and 0 included...
375  else
376  {
377
378  }
379
380  &gsprintf($out, "\n");
381  &gsprintf($out, "*********************************************\n");
382  &gsprintf($out, "{$inexport_mode.complete}\n");
383  &gsprintf($out, "*********************************************\n");
384
385  # ... but for now just comment this out
386  #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
387  &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
388
389  if ($close_stats)
390  {
391    close STATS;
392  }
393}
394## generate_statistics() ##
395
396
397## @function makeHeader($msg, [$length])
398#
399# Create a centered header string given a certain message padded with '=' characters.
400#
401# @param $msg The message to center as a string
402# @param $length The desired length of string - defaults to 80
403# @return A string centered with '=' as padding
404#
405sub makeHeader
406{
407  my ($msg, $length) = @_;
408  if (!defined $length)
409  {
410    $length = 80;
411  }
412  my $filler_length = ($length - 2 - length($msg)) / 2;
413  my $filler = '=' x $filler_length;
414  if (length($msg) % 2 == 1)
415  {
416    $msg = $filler . ' ' . $msg . ' =' . $filler;
417  }
418  else
419  {
420    $msg = $filler . ' ' . $msg . ' ' . $filler;
421  }
422  return $msg;
423}
424## makeHeader() ##
4251;
Note: See TracBrowser for help on using the browser.