source: gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm@ 28779

Last change on this file since 28779 was 28779, checked in by jmt12, 10 years ago

Making timing message all sorts of purty

File size: 11.6 KB
Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40use Time::HiRes qw( gettimeofday tv_interval );
41
42# Greenstone Modules
43use gsprintf 'gsprintf';
44use inexport;
45
46our $start_time;
47
48BEGIN
49{
50 @parallelbuildinginexport::ISA = ('inexport');
51}
52
53END
54{
55 if (defined $start_time)
56 {
57 my $end_time = [&gettimeofday()];
58 my $duration = tv_interval($start_time, $end_time);
59 print &makeHeader('Parallel Import Complete') . "\n";
60 print ' Ended: ' . @{$end_time}[0] . '.' . @{$end_time}[1] . "\n";
61 print ' Duration: ' . sprintf('%0.6f', $duration) . "\n";
62 print '=' x 80 . "\n";
63 }
64}
65
66# jobs and epoch added for parallel processing [hs, 1 july 2010]
67# added aliases 'workers' and 'batchsize' [jmt12]
68my $arguments = [
69 { 'name' => "workers",
70 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
71 'type' => "int",
72 'range' => "0,",
73 'reqd' => "no",
74 'hiddengli' => "yes" },
75 { 'name' => "batchsize",
76 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
77 'type' => "int",
78 'range' => "1,",
79 'reqd' => "no",
80 'hiddengli' => "yes" },
81 { 'name' => "nolocal",
82 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
83 'type' => "flag",
84 'reqd' => "no",
85 'hiddengli' => "yes" },
86 { 'name' => "jobs",
87 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
88 'type' => "int",
89 'range' => "1,",
90 'reqd' => "no",
91 'internal' => "1",
92 'hiddengli' => "yes" },
93 { 'name' => "epoch",
94 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
95 'type' => "int",
96 'range' => "1,",
97 'reqd' => "no",
98 'internal' => "1",
99 'hiddengli' => "yes" },
100 ];
101
102
103# @function new()
104# Constructor
105#
106sub new
107{
108 my $class = shift(@_);
109 my $self = new inexport(@_);
110
111 $start_time = [&gettimeofday()];
112
113 # Legacy support - Dr Suleman initially had different names for these
114 # arguments, namely jobs and epoch
115 if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
116 {
117 $self->{'workers'} = $self->{'jobs'} - 1;
118 }
119 if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
120 {
121 $self->{'batchsize'} = $self->{'epoch'};
122 }
123
124 # Sanity Check
125 if ($self->{'batchsize'} !~ /^\d+$/)
126 {
127 print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n";
128 $self->{'batchsize'} = 1;
129 }
130 if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
131 {
132 print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n";
133 $self->{'workers'} = 0;
134 }
135 else
136 {
137 my $message = 'Performing Parallel Import';
138 print &makeHeader($message) . "\n";
139 print ' Started: ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n";
140 print ' Workers: ' . $self->{'workers'} . "\n";
141 print ' Batchsize: ' . $self->{'batchsize'} . "\n";
142 print '=' x 80 . "\n";
143 if (!$self->{'removeold'})
144 {
145 print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
146 sleep(3); #just in case
147 $self->{'removeold'} = 1;
148 }
149 }
150
151 return bless($self, $class);
152}
153# new()
154
155
156## @function deinit()
157
158
159# @function _farmOutProcesses()
160# Index the files in parallel using MPI farmer to farm off multiple processes
161# @author hs, 1 july 2010
162#
163sub _farmOutProcesses
164{
165 my $self = shift(@_);
166 my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
167
168 my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
169 if (!-d $tmp_dir_path)
170 {
171 mkdir($tmp_dir_path, 0777);
172 }
173
174 # create the list of files to import
175 my $overwrite = 1;
176 my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
177 # - if the file is already there (which is should be during testing) then
178 # don't regenerate. This is especially important for imports of 1 million
179 # documents as just the directory scan can take several hours.
180 if ($overwrite || !-f $tmp_filelist)
181 {
182 open (my $filelist, ">$tmp_filelist");
183 my @filenames = keys %{$block_hash->{'all_files'}};
184 @filenames = shuffle(@filenames);
185 foreach my $filename (@filenames)
186 {
187 my $full_filename = &util::filename_cat($importdir,$filename);
188 if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
189 {
190 print $filelist "$filename\n";
191 }
192 }
193 close ($filelist);
194 }
195
196 # Determine if we've been provided a mpi.conf file to indicate the other
197 # machines (slave nodes) this parallizable process should run on
198 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
199 my $mpi_flags = '';
200 if (-f $mpi_conf_path)
201 {
202 print STDERR " ***** CLUSTER MODE *****\n";
203 $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
204 #$mpi_flags .= '-nolocal ';
205 }
206 else
207 {
208 print STDERR " ***** SINGLE COMPUTER MODE *****\n";
209 }
210 $mpi_flags .= ' --show-progress --timestamp-output --verbose';
211
212 # invoke the farmer to start processing the files
213 my $gsdlhome;
214 if (defined $site && $site ne '')
215 {
216 $gsdlhome = $ENV{'GSDL3HOME'};
217 }
218 else
219 {
220 $site = "";
221 $gsdlhome = $ENV{'GSDLHOME'};
222 }
223
224 # commands now assume path is correct to find this executables (as they
225 # will be under the new extensions framework)
226 my $farmer_exe = $gsdlhome . '/ext/parallel-building/' . $ENV{'GSDLOS'} . '/bin/mpiimport';
227
228 my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
229 print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
230
231 open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
232 while ( defined( my $line = <MPI> ) )
233 {
234 chomp($line);
235 print "$line\n";
236 }
237 close(MPI);
238}
239# _farmOutProcesses()
240
241
242## @function getSupportedArguments()
243#
244# Retrieve the list of arguments that are specific to this subclass of inexport
245# so they can be added to the list of supported arguments to import.pl. The use
246# of any of these arguments automatically causes this subclass to be
247# instantiated and used in preference to the parent class. ATM it is up to the
248# implementer to ensure these arguments are unique.
249#
250sub getSupportedArguments
251{
252 return $arguments;
253}
254## getSupportedArguments() ##
255
256
257################################################################################
258##### Overrides
259################################################################################
260
261
262## @function perform_process_files()
263#
264sub perform_process_files
265{
266 my $self = shift(@_);
267 my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
268 my $batchsize = $self->{'batchsize'};
269 my $collection = $self->{'collection'};
270 my $site = $self->{'site'};
271 my $workers = $self->{'workers'};
272
273 # Parallel Import
274 if ($workers > 0)
275 {
276 # Call the function written by HS
277 $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
278 }
279 # Serial import
280 else
281 {
282 $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
283 }
284}
285## perform_process_files() ##
286
287## @function generate_statistics()
288#
289# Write out import stats - only output statistics if there are multiple jobs
290# @author hs, 1 july 2010
291#
292sub generate_statistics
293{
294 my $self = shift @_;
295 my ($pluginfo) = @_;
296
297 my $inexport_mode = $self->{'mode'};
298
299 my $statsfile = $self->{'statsfile'};
300 my $out = $self->{'out'};
301 my $faillogname = $self->{'faillogname'};
302 my $gli = $self->{'gli'};
303
304 my $close_stats = 0;
305 # Child processes should have been sent to file, only the parent process
306 # should be writing to STD*.
307 if ($statsfile !~ /^(STDERR|STDOUT)$/i)
308 {
309 if (open (STATS, ">$statsfile"))
310 {
311 $statsfile = 'inexport::STATS';
312 $close_stats = 1;
313 }
314 else
315 {
316 &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
317 &gsprintf($out, "{import.stats_backup}\n");
318 $statsfile = 'STDERR';
319 }
320 }
321 # Master thread. In the future I should do something smarter here, like read
322 # in the stats from all the worker threads stat files, add them up, then
323 # create a dummy plugin object and add it to pluginfo. Otherwise the results
324 # always show 0 documents considered and 0 included...
325 else
326 {
327
328 }
329
330 &gsprintf($out, "\n");
331 &gsprintf($out, "*********************************************\n");
332 &gsprintf($out, "{$inexport_mode.complete}\n");
333 &gsprintf($out, "*********************************************\n");
334
335 # ... but for now just comment this out
336 #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
337 &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
338
339 if ($close_stats)
340 {
341 close STATS;
342 }
343}
344## generate_statistics() ##
345
346
347## @function makeHeader($msg, [$length])
348#
349# Create a centered header string given a certain message padded with '=' characters.
350#
351# @param $msg The message to center as a string
352# @param $length The desired length of string - defaults to 80
353# @return A string centered with '=' as padding
354#
355sub makeHeader
356{
357 my ($msg, $length) = @_;
358 if (!defined $length)
359 {
360 $length = 80;
361 }
362 my $filler_length = ($length - 2 - length($msg)) / 2;
363 my $filler = '=' x $filler_length;
364 if (length($msg) % 2 == 1)
365 {
366 $msg = $filler . ' ' . $msg . ' =' . $filler;
367 }
368 else
369 {
370 $msg = $filler . ' ' . $msg . ' ' . $filler;
371 }
372 return $msg;
373}
374## makeHeader() ##
3751;
Note: See TracBrowser for help on using the repository browser.