root/gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm @ 29276

Revision 29276, 13.7 KB (checked in by jmt12, 5 years ago)

I need to measure the time spent on generating the initial manifest, as serial processing currently doesn't do this.

Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the  GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40use Time::HiRes  qw( gettimeofday tv_interval );
41
42# Greenstone Modules
43use gsprintf 'gsprintf';
44use inexport;
45
46our $start_time;
47our $filelist_duration;
48
49BEGIN
50{
51  @parallelbuildinginexport::ISA = ('inexport');
52}
53
54END
55{
56  if (defined $start_time)
57  {
58    my $end_time = [&gettimeofday()];
59    my $duration = tv_interval($start_time, $end_time);
60    print &makeHeader('Parallel Import Complete') . "\n";
61    print '  Ended:    ' . @{$end_time}[0] . '.' . @{$end_time}[1] . "\n";
62    if (defined $filelist_duration)
63    {
64      print '  Generating raw manifest: ' . sprintf('%0.6f', $filelist_duration) . "\n";
65    }
66    print '  Duration: ' . sprintf('%0.6f', $duration) . "\n";
67    print '=' x 80 . "\n";
68  }
69}
70
71# jobs and epoch added for parallel processing [hs, 1 july 2010]
72# added aliases 'workers' and 'batchsize' [jmt12]
73my $arguments = [
74                 { 'name' => "workers",
75                   'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
76                   'type' => "int",
77                   'range' => "0,",
78                   'reqd' => "no",
79                   'hiddengli' => "yes" },
80                 { 'name' => "batchsize",
81                   'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
82                   'type' => "int",
83                   'range' => "1,",
84                   'reqd' => "no",
85                   'hiddengli' => "yes" },
86                 { 'name' => "nolocal",
87                   'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
88                   'type' => "flag",
89                   'reqd' => "no",
90                   'hiddengli' => "yes" },
91                 { 'name' => "jobs",
92                   'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
93                   'type' => "int",
94                   'range' => "1,",
95                   'reqd' => "no",
96                   'internal' => "1",
97                   'hiddengli' => "yes" },
98                 { 'name' => "epoch",
99                   'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
100                   'type' => "int",
101                   'range' => "1,",
102                   'reqd' => "no",
103                   'internal' => "1",
104                   'hiddengli' => "yes" },
105                ];
106
107
108# @function new()
109# Constructor
110#
111sub new
112{
113  my $class = shift(@_);
114  my $self = new inexport(@_);
115
116  $start_time = [&gettimeofday()];
117
118  # Legacy support - Dr Suleman initially had different names for these
119  # arguments, namely jobs and epoch
120  if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
121  {
122    $self->{'workers'} = $self->{'jobs'} - 1;
123  }
124  if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
125  {
126    $self->{'batchsize'} = $self->{'epoch'};
127  }
128
129  # Sanity Check
130  if ($self->{'batchsize'} !~ /^\d+$/)
131  {
132    print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n";
133    $self->{'batchsize'} = 1;
134  }
135  if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
136  {
137    print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n";
138    $self->{'workers'} = 0;
139  }
140  else
141  {
142    my $message = 'Performing Parallel Import';
143    print &makeHeader($message) . "\n";
144    print '  Started:   ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n";
145    print '  Workers:   ' . $self->{'workers'} . "\n";
146    print '  Batchsize: ' . $self->{'batchsize'} . "\n";
147    print '=' x 80 . "\n";
148    if (!$self->{'removeold'})
149    {
150      print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
151      sleep(3); #just in case
152      $self->{'removeold'} = 1;
153    }
154  }
155
156  return bless($self, $class);
157}
158# new()
159
160
161## @function
162#
163sub set_collection_options
164{
165  my $self = shift(@_);
166  my ($collectcfg) = @_;
167  $self->SUPER::set_collection_options($collectcfg);
168  $self->{'collectcfg'} = $collectcfg;
169}
170## set_collection_options() ##
171
172
173## @function deinit()
174
175
176# @function _farmOutProcesses()
177# Index the files in parallel using MPI farmer to farm off multiple processes
178# @author hs, 1 july 2010
179#
180sub _farmOutProcesses
181{
182  my $self = shift(@_);
183  my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
184
185  my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
186  if (!-d $tmp_dir_path)
187  {
188    mkdir($tmp_dir_path, 0777);
189  }
190
191  # create the list of files to import
192  my $filelist_start_time = [&gettimeofday()];
193  my $overwrite = 1;
194  my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
195  # - if the file is already there (which is should be during testing) then
196  #   don't regenerate. This is especially important for imports of 1 million
197  #   documents as just the directory scan can take several hours.
198  if ($overwrite || !-f $tmp_filelist)
199  {
200    open (my $filelist, ">$tmp_filelist");
201    my @filenames = keys %{$block_hash->{'all_files'}};
202    @filenames = shuffle(@filenames);
203    foreach my $filename (@filenames)
204    {
205      my $full_filename = &util::filename_cat($importdir,$filename);
206      if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
207      {
208        print $filelist "$filename\n";
209      }
210    }
211    close ($filelist);
212  }
213  my $filelist_end_time = [&gettimeofday()];
214  $filelist_duration = tv_interval($filelist_start_time, $filelist_end_time);
215
216  # Determine if we've been provided a mpi.conf file to indicate the other
217  # machines (slave nodes) this parallizable process should run on
218  my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
219  my $mpi_flags = '';
220  if (-f $mpi_conf_path)
221  {
222    print STDERR " ***** CLUSTER MODE *****\n";
223    $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
224    $mpi_flags .= '-nolocal ';
225  }
226  else
227  {
228    print STDERR " ***** SINGLE COMPUTER MODE *****\n";
229  }
230  $mpi_flags .= ' --show-progress --timestamp-output --verbose';
231  # fix for mpi binding to incorrect interface device (seems to have an
232  # unhealthy obsession with virbr0)
233  $mpi_flags .= ' --mca btl tcp,sm,self --mca btl_tcp_if_include eth0 ';
234
235  # invoke the farmer to start processing the files
236  my $gsdlhome;
237  if (defined $site && $site ne '')
238  {
239    $gsdlhome = $ENV{'GSDL3HOME'};
240  }
241  else
242  {
243    $site = "";
244    $gsdlhome = $ENV{'GSDLHOME'};
245  }
246
247  # commands now assume path is correct to find this executables (as they
248  # will be under the new extensions framework)
249  my $farmer_exe = $gsdlhome . '/ext/parallel-building/' . $ENV{'GSDLOS'} . '/bin/mpiimport';
250
251  my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
252  print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
253
254  open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
255  while ( defined( my $line = <MPI> )  )
256  {
257    chomp($line);
258    print "$line\n";
259  }
260  close(MPI);
261}
262# _farmOutProcesses()
263
264
265## @function getSupportedArguments()
266#
267# Retrieve the list of arguments that are specific to this subclass of inexport
268# so they can be added to the list of supported arguments to import.pl. The use
269# of any of these arguments automatically causes this subclass to be
270# instantiated and used in preference to the parent class. ATM it is up to the
271# implementer to ensure these arguments are unique.
272#
273sub getSupportedArguments
274{
275  return $arguments;
276}
277## getSupportedArguments() ##
278
279
280################################################################################
281##### Overrides
282################################################################################
283
284
285## @function perform_process_files()
286#
287sub perform_process_files
288{
289  my $self = shift(@_);
290  my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
291  my $batchsize = $self->{'batchsize'};
292  my $collection = $self->{'collection'};
293  my $site = $self->{'site'};
294  my $workers = $self->{'workers'};
295
296  # Parallel Import
297  if ($workers > 0)
298  {
299    # Call the function written by HS
300    $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
301  }
302  # Serial import
303  else
304  {
305    $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
306  }
307
308  # the individual parts into one single database
309  my $infodb_type = $self->{'collectcfg'}->{'infodbtype'};
310  my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
311  if ( -f $mpi_conf_path && &dbutil::supportsMerge($infodb_type) )
312  {
313    print STDERR ' * Merging ' . $infodb_type . ' databases... ';
314    my @databases = ('archiveinf-src','archiveinf-doc');
315    foreach my $database (@databases)
316    {
317      # generate the path to the target database without any hostname suffix
318      my $archive_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, '');
319      #rint STDERR " - merging to: " . $archive_db_path . "\n";
320      open(MPIHOSTS, '<:utf8', $mpi_conf_path) or die("Failed to open mpi.conf for reading");
321      my $line;
322      while ($line = <MPIHOSTS>)
323      {
324        if ($line =~ /^([a-z0-9\-]+)/i)
325        {
326          my $hostname = $1;
327          ###rint STDERR " - searching for database for: " . $hostname . "\n";
328          my $mergable_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, $hostname);
329          ###rint STDERR "[debug] " . $mergable_db_path . "\n";
330          if (-f $mergable_db_path)
331          {
332            ###rint STDERR " - merge $mergable_db_path > $archive_db_path\n";
333            &dbutil::merge_databases($infodb_type, $mergable_db_path, $archive_db_path);
334          }
335        }
336      }
337      close(MPIHOSTS);
338    }
339    print "Done!\n";
340  }
341
342}
343## perform_process_files() ##
344
345## @function generate_statistics()
346#
347# Write out import stats - only output statistics if there are multiple jobs
348# @author hs, 1 july 2010
349#
350sub generate_statistics
351{
352  my $self = shift @_;
353  my ($pluginfo) = @_;
354
355  my $inexport_mode = $self->{'mode'};
356
357  my $statsfile   = $self->{'statsfile'};
358  my $out         = $self->{'out'};
359  my $faillogname = $self->{'faillogname'};
360  my $gli         = $self->{'gli'};
361
362  my $close_stats = 0;
363  # Child processes should have been sent to file, only the parent process
364  # should be writing to STD*.
365  if ($statsfile !~ /^(STDERR|STDOUT)$/i)
366  {
367    if (open (STATS, ">$statsfile"))
368    {
369      $statsfile = 'inexport::STATS';
370      $close_stats = 1;
371    }
372    else
373    {
374      &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
375      &gsprintf($out, "{import.stats_backup}\n");
376      $statsfile = 'STDERR';
377    }
378  }
379  # Master thread. In the future I should do something smarter here, like read
380  # in the stats from all the worker threads stat files, add them up, then
381  # create a dummy plugin object and add it to pluginfo. Otherwise the results
382  # always show 0 documents considered and 0 included...
383  else
384  {
385
386  }
387
388  &gsprintf($out, "\n");
389  &gsprintf($out, "*********************************************\n");
390  &gsprintf($out, "{$inexport_mode.complete}\n");
391  &gsprintf($out, "*********************************************\n");
392
393  # ... but for now just comment this out
394  #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
395  &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
396
397  if ($close_stats)
398  {
399    close STATS;
400  }
401}
402## generate_statistics() ##
403
404
405## @function makeHeader($msg, [$length])
406#
407# Create a centered header string given a certain message padded with '=' characters.
408#
409# @param $msg The message to center as a string
410# @param $length The desired length of string - defaults to 80
411# @return A string centered with '=' as padding
412#
413sub makeHeader
414{
415  my ($msg, $length) = @_;
416  if (!defined $length)
417  {
418    $length = 80;
419  }
420  my $filler_length = ($length - 2 - length($msg)) / 2;
421  my $filler = '=' x $filler_length;
422  if (length($msg) % 2 == 1)
423  {
424    $msg = $filler . ' ' . $msg . ' =' . $filler;
425  }
426  else
427  {
428    $msg = $filler . ' ' . $msg . ' ' . $filler;
429  }
430  return $msg;
431}
432## makeHeader() ##
4331;
Note: See TracBrowser for help on using the browser.