source: gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm@ 28001

Last change on this file since 28001 was 28001, checked in by jmt12, 11 years ago

Write datestamp using dbutil if applicable

File size: 11.9 KB
Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40
41# Greenstone Modules
42use gsprintf 'gsprintf';
43use inexport;
44
45BEGIN
46{
47 @parallelbuildinginexport::ISA = ('inexport');
48}
49
50# jobs and epoch added for parallel processing [hs, 1 july 2010]
51# added aliases 'workers' and 'batchsize' [jmt12]
52my $arguments = [
53 { 'name' => "workers",
54 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
55 'type' => "int",
56 'range' => "0,",
57 'reqd' => "no",
58 'hiddengli' => "yes" },
59 { 'name' => "batchsize",
60 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
61 'type' => "int",
62 'range' => "1,",
63 'reqd' => "no",
64 'hiddengli' => "yes" },
65 { 'name' => "nolocal",
66 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
67 'type' => "flag",
68 'reqd' => "no",
69 'hiddengli' => "yes" },
70 { 'name' => "jobs",
71 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
72 'type' => "int",
73 'range' => "1,",
74 'reqd' => "no",
75 'internal' => "1",
76 'hiddengli' => "yes" },
77 { 'name' => "epoch",
78 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
79 'type' => "int",
80 'range' => "1,",
81 'reqd' => "no",
82 'internal' => "1",
83 'hiddengli' => "yes" },
84 ];
85
86# @function new()
87# Constructor
88#
89sub new
90{
91 my $class = shift(@_);
92 my $self = new inexport(@_);
93
94 # Legacy support
95 if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
96 {
97 $self->{'workers'} = $self->{'jobs'} - 1;
98 }
99 if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
100 {
101 $self->{'batchsize'} = $self->{'epoch'};
102 }
103
104 # Sanity Check
105 if ($self->{'batchsize'} !~ /^\d+$/)
106 {
107 print STDERR "WARNING: batchsize missing or not a number - assuming batchsize = 1\n";
108 $self->{'batchsize'} = 1;
109 }
110 if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
111 {
112 print STDERR "WARNING: parallel processing not available with fewer than one worker - assuming serial import\n";
113 $self->{'workers'} = 0;
114 }
115 else
116 {
117 print "Performing Parallel Import: workers:" . $self->{'workers'} . ", batchsize:" . $self->{'batchsize'} . "\n";
118 if (!$self->{'removeold'})
119 {
120 print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
121 sleep(3); #just in case
122 $self->{'removeold'} = 1;
123 }
124 }
125
126 return bless($self, $class);
127}
128# new()
129
130# @function _farmOutProcesses()
131# Index the files in parallel using MPI farmer to farm off multiple processes
132# @author hs, 1 july 2010
133#
134sub _farmOutProcesses
135{
136 my $self = shift(@_);
137 my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
138
139 my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
140 if (!-d $tmp_dir_path)
141 {
142 mkdir($tmp_dir_path, 0777);
143 }
144
145 # create the list of files to import
146 my $overwrite = 1;
147 my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
148 # - if the file is already there (which is should be during testing) then
149 # don't regenerate. This is especially important for imports of 1 million
150 # documents as just the directory scan can take several hours.
151 if ($overwrite || !-f $tmp_filelist)
152 {
153 open (my $filelist, ">$tmp_filelist");
154 my @filenames = keys %{$block_hash->{'all_files'}};
155 @filenames = shuffle(@filenames);
156 foreach my $filename (@filenames)
157 {
158 my $full_filename = &util::filename_cat($importdir,$filename);
159 if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
160 {
161 print $filelist "$filename\n";
162 }
163 }
164 close ($filelist);
165 }
166
167 # Determine if we've been provided a mpi.conf file to indicate the other
168 # machines (slave nodes) this parallizable process should run on
169 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
170 my $mpi_flags = '';
171 if (-f $mpi_conf_path)
172 {
173 print STDERR " ***** CLUSTER MODE *****\n";
174 $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
175 #$mpi_flags .= '-nolocal ';
176 }
177 else
178 {
179 print STDERR " ***** SINGLE COMPUTER MODE *****\n";
180 }
181 $mpi_flags .= ' --show-progress --timestamp-output --verbose';
182
183 # invoke the farmer to start processing the files
184 my $gsdlhome;
185 if (defined $site && $site ne '')
186 {
187 $gsdlhome = $ENV{'GSDL3HOME'};
188 }
189 else
190 {
191 $site = "";
192 $gsdlhome = $ENV{'GSDLHOME'};
193 }
194
195 # commands now assume path is correct to find this executables (as they
196 # will be under the new extensions framework)
197 my $farmer_exe = 'mpiimport';
198
199 my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
200 print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
201
202 open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
203 while ( defined( my $line = <MPI> ) )
204 {
205 chomp($line);
206 print "$line\n";
207 }
208 close(MPI);
209}
210# _farmOutProcesses()
211
212# @function getSupportedArguments
213# Retrieve the list of arguments that are specific to this subclass of inexport
214# so they can be added to the list of supported arguments to import.pl. The
215# use of any of these arguments automatically causes this subclass to be
216# instantiated and used in preference to the parent class. ATM it is up to the
217# implementer to ensure these arguments are unique.
218sub getSupportedArguments
219{
220 return $arguments;
221}
222# getSupportedArguments()
223
224################################################################################
225##### Overrides
226################################################################################
227
228
229## @function perform_process_files()
230#
231sub perform_process_files
232{
233 my $self = shift(@_);
234 my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
235 my $batchsize = $self->{'batchsize'};
236 my $collection = $self->{'collection'};
237 my $site = $self->{'site'};
238 my $workers = $self->{'workers'};
239
240 # Parallel Import
241 if ($workers > 0)
242 {
243 # Call the function written by HS
244 $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
245 }
246 # Serial import
247 else
248 {
249 $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
250 }
251}
252## perform_process_files() ##
253
254## @function generate_statistics()
255#
256# Write out import stats - only output statistics if there are multiple jobs
257# @author hs, 1 july 2010
258#
259sub generate_statistics
260{
261 my $self = shift @_;
262 my ($pluginfo) = @_;
263
264 my $inexport_mode = $self->{'mode'};
265
266 my $statsfile = $self->{'statsfile'};
267 my $out = $self->{'out'};
268 my $faillogname = $self->{'faillogname'};
269 my $gli = $self->{'gli'};
270
271 my $close_stats = 0;
272 # Child processes should have been sent to file, only the parent process
273 # should be writing to STD*.
274 if ($statsfile !~ /^(STDERR|STDOUT)$/i)
275 {
276 if (open (STATS, ">$statsfile"))
277 {
278 $statsfile = 'inexport::STATS';
279 $close_stats = 1;
280 }
281 else
282 {
283 &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
284 &gsprintf($out, "{import.stats_backup}\n");
285 $statsfile = 'STDERR';
286 }
287 }
288 # Master thread. In the future I should do something smarter here, like read
289 # in the stats from all the worker threads stat files, add them up, then
290 # create a dummy plugin object and add it to pluginfo. Otherwise the results
291 # always show 0 documents considered and 0 included...
292 else
293 {
294
295 }
296
297 &gsprintf($out, "\n");
298 &gsprintf($out, "*********************************************\n");
299 &gsprintf($out, "{$inexport_mode.complete}\n");
300 &gsprintf($out, "*********************************************\n");
301
302 # ... but for now just comment this out
303 #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
304 &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
305
306 if ($close_stats)
307 {
308 close STATS;
309 }
310}
311## generate_statistics() ##
312
313
314## @function recordEarliestDatestamp()
315#
316# Check for existence of the file that's to contain earliestDateStamp in
317# archivesdir Do nothing if the file already exists (file exists on incremental
318# build). If the file doesn't exist, as happens on full build, create it and
319# write out the current datestamp into it In buildcol, read the file's contents
320# and set the earliestdateStamp in GS2's build.cfg / GS3's buildconfig.xml In
321# doc.pm have set_oaiLastModified similar to set_lastmodified, and create the
322# doc fields oailastmodified and oailastmodifieddate
323#
324sub recordEarliestDatestamp
325{
326 my $self = shift(@_);
327 my ($infodbtype, $archivedir) = @_;
328 my $current_time_in_seconds = time; # in seconds
329 # We use the infodb to store this information if it supports it
330 if (defined(&dbutil::supportsDatestamp) && &dbutil::supportsDatestamp($infodbtype))
331 {
332 my $datestamp_db = &dbutil::get_infodb_file_path($infodbtype, 'datestamp', $archivedir);
333 my $datestamp_db_fh = &dbutil::open_infodb_write_handle($infodbtype, $datestamp_db, 'append');
334 &dbutil::write_infodb_rawentry($infodbtype, $datestamp_db_fh, 'earliest', $current_time_in_seconds);
335 &dbutil::close_infodb_write_handle($infodbtype, $datestamp_db_fh);
336 }
337 else
338 {
339 my $earliestDatestampFile = &FileUtils::filenameConcatenate($archivedir, "earliestDatestamp");
340 if (!&FileUtils::fileExists($earliestDatestampFile) && &FileUtils::directoryExists($archivedir))
341 {
342 my $datestamp_fh;
343 if(&FileUtils::openFileHandle($earliestDatestampFile, 'w', \$datestamp_fh))
344 {
345 print $datestamp_fh $current_time_in_seconds;
346 &FileUtils::closeFileHandle($earliestDatestampFile, \$datestamp_fh);
347 }
348 else
349 {
350 &gsprintf(STDERR, "{import.cannot_write_earliestdatestamp}\n", $earliestDatestampFile);
351 }
352 }
353 }
354}
355## recordEarliestDatestamp() ##
356
3571;
Note: See TracBrowser for help on using the repository browser.