root/gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm @ 28001

Revision 28001, 11.9 KB (checked in by jmt12, 6 years ago)

Write datestamp using dbutil if applicable

Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the  GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40
41# Greenstone Modules
42use gsprintf 'gsprintf';
43use inexport;
44
45BEGIN
46{
47  @parallelbuildinginexport::ISA = ('inexport');
48}
49
50# jobs and epoch added for parallel processing [hs, 1 july 2010]
51# added aliases 'workers' and 'batchsize' [jmt12]
52my $arguments = [
53                 { 'name' => "workers",
54                   'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
55                   'type' => "int",
56                   'range' => "0,",
57                   'reqd' => "no",
58                   'hiddengli' => "yes" },
59                 { 'name' => "batchsize",
60                   'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
61                   'type' => "int",
62                   'range' => "1,",
63                   'reqd' => "no",
64                   'hiddengli' => "yes" },
65                 { 'name' => "nolocal",
66                   'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
67                   'type' => "flag",
68                   'reqd' => "no",
69                   'hiddengli' => "yes" },
70                 { 'name' => "jobs",
71                   'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
72                   'type' => "int",
73                   'range' => "1,",
74                   'reqd' => "no",
75                   'internal' => "1",
76                   'hiddengli' => "yes" },
77                 { 'name' => "epoch",
78                   'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
79                   'type' => "int",
80                   'range' => "1,",
81                   'reqd' => "no",
82                   'internal' => "1",
83                   'hiddengli' => "yes" },
84                ];
85
86# @function new()
87# Constructor
88#
89sub new
90{
91  my $class = shift(@_);
92  my $self = new inexport(@_);
93
94  # Legacy support
95  if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
96  {
97    $self->{'workers'} = $self->{'jobs'} - 1;
98  }
99  if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
100  {
101    $self->{'batchsize'} = $self->{'epoch'};
102  }
103
104  # Sanity Check
105  if ($self->{'batchsize'} !~ /^\d+$/)
106  {
107    print STDERR "WARNING: batchsize missing or not a number - assuming batchsize = 1\n";
108    $self->{'batchsize'} = 1;
109  }
110  if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
111  {
112    print STDERR "WARNING: parallel processing not available with fewer than one worker - assuming serial import\n";
113    $self->{'workers'} = 0;
114  }
115  else
116  {
117    print "Performing Parallel Import: workers:" . $self->{'workers'} . ", batchsize:" . $self->{'batchsize'} . "\n";
118    if (!$self->{'removeold'})
119    {
120      print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
121      sleep(3); #just in case
122      $self->{'removeold'} = 1;
123    }
124  }
125
126  return bless($self, $class);
127}
128# new()
129
130# @function _farmOutProcesses()
131# Index the files in parallel using MPI farmer to farm off multiple processes
132# @author hs, 1 july 2010
133#
134sub _farmOutProcesses
135{
136  my $self = shift(@_);
137  my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
138
139  my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
140  if (!-d $tmp_dir_path)
141  {
142    mkdir($tmp_dir_path, 0777);
143  }
144
145  # create the list of files to import
146  my $overwrite = 1;
147  my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
148  # - if the file is already there (which is should be during testing) then
149  #   don't regenerate. This is especially important for imports of 1 million
150  #   documents as just the directory scan can take several hours.
151  if ($overwrite || !-f $tmp_filelist)
152  {
153    open (my $filelist, ">$tmp_filelist");
154    my @filenames = keys %{$block_hash->{'all_files'}};
155    @filenames = shuffle(@filenames);
156    foreach my $filename (@filenames)
157    {
158      my $full_filename = &util::filename_cat($importdir,$filename);
159      if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
160      {
161        print $filelist "$filename\n";
162      }
163    }
164    close ($filelist);
165  }
166
167  # Determine if we've been provided a mpi.conf file to indicate the other
168  # machines (slave nodes) this parallizable process should run on
169  my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
170  my $mpi_flags = '';
171  if (-f $mpi_conf_path)
172  {
173    print STDERR " ***** CLUSTER MODE *****\n";
174    $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
175    #$mpi_flags .= '-nolocal ';
176  }
177  else
178  {
179    print STDERR " ***** SINGLE COMPUTER MODE *****\n";
180  }
181  $mpi_flags .= ' --show-progress --timestamp-output --verbose';
182
183  # invoke the farmer to start processing the files
184  my $gsdlhome;
185  if (defined $site && $site ne '')
186  {
187    $gsdlhome = $ENV{'GSDL3HOME'};
188  }
189  else
190  {
191    $site = "";
192    $gsdlhome = $ENV{'GSDLHOME'};
193  }
194
195  # commands now assume path is correct to find this executables (as they
196  # will be under the new extensions framework)
197  my $farmer_exe = 'mpiimport';
198
199  my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
200  print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
201
202  open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
203  while ( defined( my $line = <MPI> )  )
204  {
205    chomp($line);
206    print "$line\n";
207  }
208  close(MPI);
209}
210# _farmOutProcesses()
211
212# @function getSupportedArguments
213# Retrieve the list of arguments that are specific to this subclass of inexport
214# so they can be added to the list of supported arguments to import.pl. The
215# use of any of these arguments automatically causes this subclass to be
216# instantiated and used in preference to the parent class. ATM it is up to the
217# implementer to ensure these arguments are unique.
218sub getSupportedArguments
219{
220  return $arguments;
221}
222# getSupportedArguments()
223
224################################################################################
225##### Overrides
226################################################################################
227
228
229## @function perform_process_files()
230#
231sub perform_process_files
232{
233  my $self = shift(@_);
234  my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
235  my $batchsize = $self->{'batchsize'};
236  my $collection = $self->{'collection'};
237  my $site = $self->{'site'};
238  my $workers = $self->{'workers'};
239
240  # Parallel Import
241  if ($workers > 0)
242  {
243    # Call the function written by HS
244    $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
245  }
246  # Serial import
247  else
248  {
249    $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
250  }
251}
252## perform_process_files() ##
253
254## @function generate_statistics()
255#
256# Write out import stats - only output statistics if there are multiple jobs
257# @author hs, 1 july 2010
258#
259sub generate_statistics
260{
261  my $self = shift @_;
262  my ($pluginfo) = @_;
263
264  my $inexport_mode = $self->{'mode'};
265
266  my $statsfile   = $self->{'statsfile'};
267  my $out         = $self->{'out'};
268  my $faillogname = $self->{'faillogname'};
269  my $gli         = $self->{'gli'};
270
271  my $close_stats = 0;
272  # Child processes should have been sent to file, only the parent process
273  # should be writing to STD*.
274  if ($statsfile !~ /^(STDERR|STDOUT)$/i)
275  {
276    if (open (STATS, ">$statsfile"))
277    {
278      $statsfile = 'inexport::STATS';
279      $close_stats = 1;
280    }
281    else
282    {
283      &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
284      &gsprintf($out, "{import.stats_backup}\n");
285      $statsfile = 'STDERR';
286    }
287  }
288  # Master thread. In the future I should do something smarter here, like read
289  # in the stats from all the worker threads stat files, add them up, then
290  # create a dummy plugin object and add it to pluginfo. Otherwise the results
291  # always show 0 documents considered and 0 included...
292  else
293  {
294
295  }
296
297  &gsprintf($out, "\n");
298  &gsprintf($out, "*********************************************\n");
299  &gsprintf($out, "{$inexport_mode.complete}\n");
300  &gsprintf($out, "*********************************************\n");
301
302  # ... but for now just comment this out
303  #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
304  &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
305
306  if ($close_stats)
307  {
308    close STATS;
309  }
310}
311## generate_statistics() ##
312
313
314## @function recordEarliestDatestamp()
315#
316# Check for existence of the file that's to contain earliestDateStamp in
317# archivesdir Do nothing if the file already exists (file exists on incremental
318# build).  If the file doesn't exist, as happens on full build, create it and
319# write out the current datestamp into it In buildcol, read the file's contents
320# and set the earliestdateStamp in GS2's build.cfg / GS3's buildconfig.xml In
321# doc.pm have set_oaiLastModified similar to set_lastmodified, and create the
322# doc fields oailastmodified and oailastmodifieddate
323#
324sub recordEarliestDatestamp
325{
326  my $self = shift(@_);
327  my ($infodbtype, $archivedir) = @_;
328  my $current_time_in_seconds = time; # in seconds
329  # We use the infodb to store this information if it supports it
330  if (defined(&dbutil::supportsDatestamp) && &dbutil::supportsDatestamp($infodbtype))
331  {
332    my $datestamp_db = &dbutil::get_infodb_file_path($infodbtype, 'datestamp', $archivedir);
333    my $datestamp_db_fh = &dbutil::open_infodb_write_handle($infodbtype, $datestamp_db, 'append');
334    &dbutil::write_infodb_rawentry($infodbtype, $datestamp_db_fh, 'earliest', $current_time_in_seconds);
335    &dbutil::close_infodb_write_handle($infodbtype, $datestamp_db_fh);
336  }
337  else
338  {
339    my $earliestDatestampFile = &FileUtils::filenameConcatenate($archivedir, "earliestDatestamp");
340    if (!&FileUtils::fileExists($earliestDatestampFile) && &FileUtils::directoryExists($archivedir))
341    {
342      my $datestamp_fh;
343      if(&FileUtils::openFileHandle($earliestDatestampFile, 'w', \$datestamp_fh))
344      {
345        print $datestamp_fh $current_time_in_seconds;
346        &FileUtils::closeFileHandle($earliestDatestampFile, \$datestamp_fh);
347      }
348      else
349      {
350        &gsprintf(STDERR, "{import.cannot_write_earliestdatestamp}\n", $earliestDatestampFile);
351      }
352    }
353  }
354}
355## recordEarliestDatestamp() ##
356
3571;
Note: See TracBrowser for help on using the browser.