source: gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm@ 29257

Last change on this file since 29257 was 29257, checked in by jmt12, 10 years ago

Allow for collection configuration to be passed down to parallel import proc (so we can get infodbtype), and support infodbtypes that require post-import merging(such as TDBCluster

File size: 13.4 KB
Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40use Time::HiRes qw( gettimeofday tv_interval );
41
42# Greenstone Modules
43use gsprintf 'gsprintf';
44use inexport;
45
46our $start_time;
47
48BEGIN
49{
50 @parallelbuildinginexport::ISA = ('inexport');
51}
52
53END
54{
55 if (defined $start_time)
56 {
57 my $end_time = [&gettimeofday()];
58 my $duration = tv_interval($start_time, $end_time);
59 print &makeHeader('Parallel Import Complete') . "\n";
60 print ' Ended: ' . @{$end_time}[0] . '.' . @{$end_time}[1] . "\n";
61 print ' Duration: ' . sprintf('%0.6f', $duration) . "\n";
62 print '=' x 80 . "\n";
63 }
64}
65
66# jobs and epoch added for parallel processing [hs, 1 july 2010]
67# added aliases 'workers' and 'batchsize' [jmt12]
68my $arguments = [
69 { 'name' => "workers",
70 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
71 'type' => "int",
72 'range' => "0,",
73 'reqd' => "no",
74 'hiddengli' => "yes" },
75 { 'name' => "batchsize",
76 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
77 'type' => "int",
78 'range' => "1,",
79 'reqd' => "no",
80 'hiddengli' => "yes" },
81 { 'name' => "nolocal",
82 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
83 'type' => "flag",
84 'reqd' => "no",
85 'hiddengli' => "yes" },
86 { 'name' => "jobs",
87 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
88 'type' => "int",
89 'range' => "1,",
90 'reqd' => "no",
91 'internal' => "1",
92 'hiddengli' => "yes" },
93 { 'name' => "epoch",
94 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
95 'type' => "int",
96 'range' => "1,",
97 'reqd' => "no",
98 'internal' => "1",
99 'hiddengli' => "yes" },
100 ];
101
102
103# @function new()
104# Constructor
105#
106sub new
107{
108 my $class = shift(@_);
109 my $self = new inexport(@_);
110
111 $start_time = [&gettimeofday()];
112
113 # Legacy support - Dr Suleman initially had different names for these
114 # arguments, namely jobs and epoch
115 if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
116 {
117 $self->{'workers'} = $self->{'jobs'} - 1;
118 }
119 if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
120 {
121 $self->{'batchsize'} = $self->{'epoch'};
122 }
123
124 # Sanity Check
125 if ($self->{'batchsize'} !~ /^\d+$/)
126 {
127 print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n";
128 $self->{'batchsize'} = 1;
129 }
130 if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
131 {
132 print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n";
133 $self->{'workers'} = 0;
134 }
135 else
136 {
137 my $message = 'Performing Parallel Import';
138 print &makeHeader($message) . "\n";
139 print ' Started: ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n";
140 print ' Workers: ' . $self->{'workers'} . "\n";
141 print ' Batchsize: ' . $self->{'batchsize'} . "\n";
142 print '=' x 80 . "\n";
143 if (!$self->{'removeold'})
144 {
145 print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
146 sleep(3); #just in case
147 $self->{'removeold'} = 1;
148 }
149 }
150
151 return bless($self, $class);
152}
153# new()
154
155
156## @function
157#
158sub set_collection_options
159{
160 my $self = shift(@_);
161 my ($collectcfg) = @_;
162 $self->SUPER::set_collection_options($collectcfg);
163 $self->{'collectcfg'} = $collectcfg;
164}
165## set_collection_options() ##
166
167
168## @function deinit()
169
170
171# @function _farmOutProcesses()
172# Index the files in parallel using MPI farmer to farm off multiple processes
173# @author hs, 1 july 2010
174#
175sub _farmOutProcesses
176{
177 my $self = shift(@_);
178 my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
179
180 my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
181 if (!-d $tmp_dir_path)
182 {
183 mkdir($tmp_dir_path, 0777);
184 }
185
186 # create the list of files to import
187 my $overwrite = 1;
188 my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
189 # - if the file is already there (which is should be during testing) then
190 # don't regenerate. This is especially important for imports of 1 million
191 # documents as just the directory scan can take several hours.
192 if ($overwrite || !-f $tmp_filelist)
193 {
194 open (my $filelist, ">$tmp_filelist");
195 my @filenames = keys %{$block_hash->{'all_files'}};
196 @filenames = shuffle(@filenames);
197 foreach my $filename (@filenames)
198 {
199 my $full_filename = &util::filename_cat($importdir,$filename);
200 if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
201 {
202 print $filelist "$filename\n";
203 }
204 }
205 close ($filelist);
206 }
207
208 # Determine if we've been provided a mpi.conf file to indicate the other
209 # machines (slave nodes) this parallizable process should run on
210 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
211 my $mpi_flags = '';
212 if (-f $mpi_conf_path)
213 {
214 print STDERR " ***** CLUSTER MODE *****\n";
215 $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
216 $mpi_flags .= '-nolocal ';
217 }
218 else
219 {
220 print STDERR " ***** SINGLE COMPUTER MODE *****\n";
221 }
222 $mpi_flags .= ' --show-progress --timestamp-output --verbose';
223 # fix for mpi binding to incorrect interface device (seems to have an
224 # unhealthy obsession with virbr0)
225 $mpi_flags .= ' --mca btl tcp,sm,self --mca btl_tcp_if_include eth0 ';
226
227 # invoke the farmer to start processing the files
228 my $gsdlhome;
229 if (defined $site && $site ne '')
230 {
231 $gsdlhome = $ENV{'GSDL3HOME'};
232 }
233 else
234 {
235 $site = "";
236 $gsdlhome = $ENV{'GSDLHOME'};
237 }
238
239 # commands now assume path is correct to find this executables (as they
240 # will be under the new extensions framework)
241 my $farmer_exe = $gsdlhome . '/ext/parallel-building/' . $ENV{'GSDLOS'} . '/bin/mpiimport';
242
243 my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
244 print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
245
246 open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
247 while ( defined( my $line = <MPI> ) )
248 {
249 chomp($line);
250 print "$line\n";
251 }
252 close(MPI);
253}
254# _farmOutProcesses()
255
256
257## @function getSupportedArguments()
258#
259# Retrieve the list of arguments that are specific to this subclass of inexport
260# so they can be added to the list of supported arguments to import.pl. The use
261# of any of these arguments automatically causes this subclass to be
262# instantiated and used in preference to the parent class. ATM it is up to the
263# implementer to ensure these arguments are unique.
264#
265sub getSupportedArguments
266{
267 return $arguments;
268}
269## getSupportedArguments() ##
270
271
272################################################################################
273##### Overrides
274################################################################################
275
276
277## @function perform_process_files()
278#
279sub perform_process_files
280{
281 my $self = shift(@_);
282 my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
283 my $batchsize = $self->{'batchsize'};
284 my $collection = $self->{'collection'};
285 my $site = $self->{'site'};
286 my $workers = $self->{'workers'};
287
288 # Parallel Import
289 if ($workers > 0)
290 {
291 # Call the function written by HS
292 $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
293 }
294 # Serial import
295 else
296 {
297 $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
298 }
299
300 # the individual parts into one single database
301 my $infodb_type = $self->{'collectcfg'}->{'infodbtype'};
302 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
303 if ( -f $mpi_conf_path && &dbutil::supportsMerge($infodb_type) )
304 {
305 print STDERR ' * Merging ' . $infodb_type . ' databases... ';
306 my @databases = ('archiveinf-src','archiveinf-doc');
307 foreach my $database (@databases)
308 {
309 # generate the path to the target database without any hostname suffix
310 my $archive_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, '');
311 #rint STDERR " - merging to: " . $archive_db_path . "\n";
312 open(MPIHOSTS, '<:utf8', $mpi_conf_path) or die("Failed to open mpi.conf for reading");
313 my $line;
314 while ($line = <MPIHOSTS>)
315 {
316 if ($line =~ /^([a-z0-9\-]+)/i)
317 {
318 my $hostname = $1;
319 ###rint STDERR " - searching for database for: " . $hostname . "\n";
320 my $mergable_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, $hostname);
321 ###rint STDERR "[debug] " . $mergable_db_path . "\n";
322 if (-f $mergable_db_path)
323 {
324 ###rint STDERR " - merge $mergable_db_path > $archive_db_path\n";
325 &dbutil::merge_databases($infodb_type, $mergable_db_path, $archive_db_path);
326 }
327 }
328 }
329 close(MPIHOSTS);
330 }
331 print "Done!\n";
332 }
333
334}
335## perform_process_files() ##
336
337## @function generate_statistics()
338#
339# Write out import stats - only output statistics if there are multiple jobs
340# @author hs, 1 july 2010
341#
342sub generate_statistics
343{
344 my $self = shift @_;
345 my ($pluginfo) = @_;
346
347 my $inexport_mode = $self->{'mode'};
348
349 my $statsfile = $self->{'statsfile'};
350 my $out = $self->{'out'};
351 my $faillogname = $self->{'faillogname'};
352 my $gli = $self->{'gli'};
353
354 my $close_stats = 0;
355 # Child processes should have been sent to file, only the parent process
356 # should be writing to STD*.
357 if ($statsfile !~ /^(STDERR|STDOUT)$/i)
358 {
359 if (open (STATS, ">$statsfile"))
360 {
361 $statsfile = 'inexport::STATS';
362 $close_stats = 1;
363 }
364 else
365 {
366 &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
367 &gsprintf($out, "{import.stats_backup}\n");
368 $statsfile = 'STDERR';
369 }
370 }
371 # Master thread. In the future I should do something smarter here, like read
372 # in the stats from all the worker threads stat files, add them up, then
373 # create a dummy plugin object and add it to pluginfo. Otherwise the results
374 # always show 0 documents considered and 0 included...
375 else
376 {
377
378 }
379
380 &gsprintf($out, "\n");
381 &gsprintf($out, "*********************************************\n");
382 &gsprintf($out, "{$inexport_mode.complete}\n");
383 &gsprintf($out, "*********************************************\n");
384
385 # ... but for now just comment this out
386 #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
387 &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
388
389 if ($close_stats)
390 {
391 close STATS;
392 }
393}
394## generate_statistics() ##
395
396
397## @function makeHeader($msg, [$length])
398#
399# Create a centered header string given a certain message padded with '=' characters.
400#
401# @param $msg The message to center as a string
402# @param $length The desired length of string - defaults to 80
403# @return A string centered with '=' as padding
404#
405sub makeHeader
406{
407 my ($msg, $length) = @_;
408 if (!defined $length)
409 {
410 $length = 80;
411 }
412 my $filler_length = ($length - 2 - length($msg)) / 2;
413 my $filler = '=' x $filler_length;
414 if (length($msg) % 2 == 1)
415 {
416 $msg = $filler . ' ' . $msg . ' =' . $filler;
417 }
418 else
419 {
420 $msg = $filler . ' ' . $msg . ' ' . $filler;
421 }
422 return $msg;
423}
424## makeHeader() ##
4251;
Note: See TracBrowser for help on using the repository browser.