source: gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm@ 28770

Last change on this file since 28770 was 28770, checked in by jmt12, 8 years ago

Adding microtiming... a little tricky what with TDBServer taking forever to shut down, but I've added the 'duration calculation' in the END block, so it should get all the time appropriately

File size: 11.4 KB
Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40use Time::HiRes qw( gettimeofday tv_interval );
41
42# Greenstone Modules
43use gsprintf 'gsprintf';
44use inexport;
45
46our $start_time;
47
48BEGIN
49{
50 @parallelbuildinginexport::ISA = ('inexport');
51}
52
53END
54{
55 if (defined $start_time)
56 {
57 my $end_time = [&gettimeofday()];
58 my $duration = tv_interval($start_time, $end_time);
59 print ' Duration: ' . sprintf('%0.6f', $duration) . "\n";
60 }
61}
62
63# jobs and epoch added for parallel processing [hs, 1 july 2010]
64# added aliases 'workers' and 'batchsize' [jmt12]
65my $arguments = [
66 { 'name' => "workers",
67 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
68 'type' => "int",
69 'range' => "0,",
70 'reqd' => "no",
71 'hiddengli' => "yes" },
72 { 'name' => "batchsize",
73 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
74 'type' => "int",
75 'range' => "1,",
76 'reqd' => "no",
77 'hiddengli' => "yes" },
78 { 'name' => "nolocal",
79 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
80 'type' => "flag",
81 'reqd' => "no",
82 'hiddengli' => "yes" },
83 { 'name' => "jobs",
84 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
85 'type' => "int",
86 'range' => "1,",
87 'reqd' => "no",
88 'internal' => "1",
89 'hiddengli' => "yes" },
90 { 'name' => "epoch",
91 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
92 'type' => "int",
93 'range' => "1,",
94 'reqd' => "no",
95 'internal' => "1",
96 'hiddengli' => "yes" },
97 ];
98
99
100# @function new()
101# Constructor
102#
103sub new
104{
105 my $class = shift(@_);
106 my $self = new inexport(@_);
107
108 $start_time = [&gettimeofday()];
109
110 # Legacy support - Dr Suleman initially had different names for these
111 # arguments, namely jobs and epoch
112 if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
113 {
114 $self->{'workers'} = $self->{'jobs'} - 1;
115 }
116 if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
117 {
118 $self->{'batchsize'} = $self->{'epoch'};
119 }
120
121 # Sanity Check
122 if ($self->{'batchsize'} !~ /^\d+$/)
123 {
124 print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n";
125 $self->{'batchsize'} = 1;
126 }
127 if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
128 {
129 print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n";
130 $self->{'workers'} = 0;
131 }
132 else
133 {
134 my $message = 'Performing Parallel Import';
135 print &makeHeader($message) . "\n";
136 print ' Started: ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n";
137 print ' Workers: ' . $self->{'workers'} . "\n";
138 print ' Batchsize: ' . $self->{'batchsize'} . "\n";
139 print '=' x 79 . "\n";
140 if (!$self->{'removeold'})
141 {
142 print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
143 sleep(3); #just in case
144 $self->{'removeold'} = 1;
145 }
146 }
147
148 return bless($self, $class);
149}
150# new()
151
152
153## @function deinit()
154
155
156# @function _farmOutProcesses()
157# Index the files in parallel using MPI farmer to farm off multiple processes
158# @author hs, 1 july 2010
159#
160sub _farmOutProcesses
161{
162 my $self = shift(@_);
163 my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
164
165 my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
166 if (!-d $tmp_dir_path)
167 {
168 mkdir($tmp_dir_path, 0777);
169 }
170
171 # create the list of files to import
172 my $overwrite = 1;
173 my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
174 # - if the file is already there (which is should be during testing) then
175 # don't regenerate. This is especially important for imports of 1 million
176 # documents as just the directory scan can take several hours.
177 if ($overwrite || !-f $tmp_filelist)
178 {
179 open (my $filelist, ">$tmp_filelist");
180 my @filenames = keys %{$block_hash->{'all_files'}};
181 @filenames = shuffle(@filenames);
182 foreach my $filename (@filenames)
183 {
184 my $full_filename = &util::filename_cat($importdir,$filename);
185 if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
186 {
187 print $filelist "$filename\n";
188 }
189 }
190 close ($filelist);
191 }
192
193 # Determine if we've been provided a mpi.conf file to indicate the other
194 # machines (slave nodes) this parallizable process should run on
195 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
196 my $mpi_flags = '';
197 if (-f $mpi_conf_path)
198 {
199 print STDERR " ***** CLUSTER MODE *****\n";
200 $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
201 #$mpi_flags .= '-nolocal ';
202 }
203 else
204 {
205 print STDERR " ***** SINGLE COMPUTER MODE *****\n";
206 }
207 $mpi_flags .= ' --show-progress --timestamp-output --verbose';
208
209 # invoke the farmer to start processing the files
210 my $gsdlhome;
211 if (defined $site && $site ne '')
212 {
213 $gsdlhome = $ENV{'GSDL3HOME'};
214 }
215 else
216 {
217 $site = "";
218 $gsdlhome = $ENV{'GSDLHOME'};
219 }
220
221 # commands now assume path is correct to find this executables (as they
222 # will be under the new extensions framework)
223 my $farmer_exe = 'mpiimport';
224
225 my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
226 print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
227
228 open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
229 while ( defined( my $line = <MPI> ) )
230 {
231 chomp($line);
232 print "$line\n";
233 }
234 close(MPI);
235}
236# _farmOutProcesses()
237
238
239## @function getSupportedArguments()
240#
241# Retrieve the list of arguments that are specific to this subclass of inexport
242# so they can be added to the list of supported arguments to import.pl. The use
243# of any of these arguments automatically causes this subclass to be
244# instantiated and used in preference to the parent class. ATM it is up to the
245# implementer to ensure these arguments are unique.
246#
247sub getSupportedArguments
248{
249 return $arguments;
250}
251## getSupportedArguments() ##
252
253
254################################################################################
255##### Overrides
256################################################################################
257
258
259## @function perform_process_files()
260#
261sub perform_process_files
262{
263 my $self = shift(@_);
264 my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
265 my $batchsize = $self->{'batchsize'};
266 my $collection = $self->{'collection'};
267 my $site = $self->{'site'};
268 my $workers = $self->{'workers'};
269
270 # Parallel Import
271 if ($workers > 0)
272 {
273 # Call the function written by HS
274 $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
275 }
276 # Serial import
277 else
278 {
279 $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
280 }
281}
282## perform_process_files() ##
283
284## @function generate_statistics()
285#
286# Write out import stats - only output statistics if there are multiple jobs
287# @author hs, 1 july 2010
288#
289sub generate_statistics
290{
291 my $self = shift @_;
292 my ($pluginfo) = @_;
293
294 my $inexport_mode = $self->{'mode'};
295
296 my $statsfile = $self->{'statsfile'};
297 my $out = $self->{'out'};
298 my $faillogname = $self->{'faillogname'};
299 my $gli = $self->{'gli'};
300
301 my $close_stats = 0;
302 # Child processes should have been sent to file, only the parent process
303 # should be writing to STD*.
304 if ($statsfile !~ /^(STDERR|STDOUT)$/i)
305 {
306 if (open (STATS, ">$statsfile"))
307 {
308 $statsfile = 'inexport::STATS';
309 $close_stats = 1;
310 }
311 else
312 {
313 &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
314 &gsprintf($out, "{import.stats_backup}\n");
315 $statsfile = 'STDERR';
316 }
317 }
318 # Master thread. In the future I should do something smarter here, like read
319 # in the stats from all the worker threads stat files, add them up, then
320 # create a dummy plugin object and add it to pluginfo. Otherwise the results
321 # always show 0 documents considered and 0 included...
322 else
323 {
324
325 }
326
327 &gsprintf($out, "\n");
328 &gsprintf($out, "*********************************************\n");
329 &gsprintf($out, "{$inexport_mode.complete}\n");
330 &gsprintf($out, "*********************************************\n");
331
332 # ... but for now just comment this out
333 #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
334 &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
335
336 if ($close_stats)
337 {
338 close STATS;
339 }
340}
341## generate_statistics() ##
342
343
344## @function makeHeader($msg, [$length])
345#
346# Create a centered header string given a certain message padded with '=' characters.
347#
348# @param $msg The message to center as a string
349# @param $length The desired length of string - defaults to 79
350# @return A string centered with '=' as padding
351#
352sub makeHeader
353{
354 my ($msg, $length) = @_;
355 if (!defined $length)
356 {
357 $length = 79; # 80 with newline
358 }
359 my $filler_length = ($length - 2 - length($msg)) / 2;
360 my $filler = '=' x $filler_length;
361 if (length($msg) % 2 == 0)
362 {
363 $msg = $filler . ' ' . $msg . ' =' . $filler;
364 }
365 else
366 {
367 $msg = $filler . ' ' . $msg . ' ' . $filler;
368 }
369 return $msg;
370}
371## makeHeader() ##
3721;
Note: See TracBrowser for help on using the repository browser.