source: gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm@ 29276

Last change on this file since 29276 was 29276, checked in by jmt12, 10 years ago

I need to measure the time spent on generating the initial manifest, as serial processing currently doesn't do this.

File size: 13.7 KB
Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32# Pragma
33use strict;
34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40use Time::HiRes qw( gettimeofday tv_interval );
41
42# Greenstone Modules
43use gsprintf 'gsprintf';
44use inexport;
45
46our $start_time;
47our $filelist_duration;
48
49BEGIN
50{
51 @parallelbuildinginexport::ISA = ('inexport');
52}
53
54END
55{
56 if (defined $start_time)
57 {
58 my $end_time = [&gettimeofday()];
59 my $duration = tv_interval($start_time, $end_time);
60 print &makeHeader('Parallel Import Complete') . "\n";
61 print ' Ended: ' . @{$end_time}[0] . '.' . @{$end_time}[1] . "\n";
62 if (defined $filelist_duration)
63 {
64 print ' Generating raw manifest: ' . sprintf('%0.6f', $filelist_duration) . "\n";
65 }
66 print ' Duration: ' . sprintf('%0.6f', $duration) . "\n";
67 print '=' x 80 . "\n";
68 }
69}
70
71# jobs and epoch added for parallel processing [hs, 1 july 2010]
72# added aliases 'workers' and 'batchsize' [jmt12]
73my $arguments = [
74 { 'name' => "workers",
75 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
76 'type' => "int",
77 'range' => "0,",
78 'reqd' => "no",
79 'hiddengli' => "yes" },
80 { 'name' => "batchsize",
81 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
82 'type' => "int",
83 'range' => "1,",
84 'reqd' => "no",
85 'hiddengli' => "yes" },
86 { 'name' => "nolocal",
87 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
88 'type' => "flag",
89 'reqd' => "no",
90 'hiddengli' => "yes" },
91 { 'name' => "jobs",
92 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
93 'type' => "int",
94 'range' => "1,",
95 'reqd' => "no",
96 'internal' => "1",
97 'hiddengli' => "yes" },
98 { 'name' => "epoch",
99 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
100 'type' => "int",
101 'range' => "1,",
102 'reqd' => "no",
103 'internal' => "1",
104 'hiddengli' => "yes" },
105 ];
106
107
108# @function new()
109# Constructor
110#
111sub new
112{
113 my $class = shift(@_);
114 my $self = new inexport(@_);
115
116 $start_time = [&gettimeofday()];
117
118 # Legacy support - Dr Suleman initially had different names for these
119 # arguments, namely jobs and epoch
120 if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
121 {
122 $self->{'workers'} = $self->{'jobs'} - 1;
123 }
124 if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
125 {
126 $self->{'batchsize'} = $self->{'epoch'};
127 }
128
129 # Sanity Check
130 if ($self->{'batchsize'} !~ /^\d+$/)
131 {
132 print STDERR "Warning! Batchsize missing or not a number - assuming batchsize = 1\n";
133 $self->{'batchsize'} = 1;
134 }
135 if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
136 {
137 print STDERR "Warning! Parallel processing not available with fewer than one worker - assuming serial import\n";
138 $self->{'workers'} = 0;
139 }
140 else
141 {
142 my $message = 'Performing Parallel Import';
143 print &makeHeader($message) . "\n";
144 print ' Started: ' . @{$start_time}[0] . '.' . @{$start_time}[1] . "\n";
145 print ' Workers: ' . $self->{'workers'} . "\n";
146 print ' Batchsize: ' . $self->{'batchsize'} . "\n";
147 print '=' x 80 . "\n";
148 if (!$self->{'removeold'})
149 {
150 print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
151 sleep(3); #just in case
152 $self->{'removeold'} = 1;
153 }
154 }
155
156 return bless($self, $class);
157}
158# new()
159
160
161## @function
162#
163sub set_collection_options
164{
165 my $self = shift(@_);
166 my ($collectcfg) = @_;
167 $self->SUPER::set_collection_options($collectcfg);
168 $self->{'collectcfg'} = $collectcfg;
169}
170## set_collection_options() ##
171
172
173## @function deinit()
174
175
176# @function _farmOutProcesses()
177# Index the files in parallel using MPI farmer to farm off multiple processes
178# @author hs, 1 july 2010
179#
180sub _farmOutProcesses
181{
182 my $self = shift(@_);
183 my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
184
185 my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
186 if (!-d $tmp_dir_path)
187 {
188 mkdir($tmp_dir_path, 0777);
189 }
190
191 # create the list of files to import
192 my $filelist_start_time = [&gettimeofday()];
193 my $overwrite = 1;
194 my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
195 # - if the file is already there (which is should be during testing) then
196 # don't regenerate. This is especially important for imports of 1 million
197 # documents as just the directory scan can take several hours.
198 if ($overwrite || !-f $tmp_filelist)
199 {
200 open (my $filelist, ">$tmp_filelist");
201 my @filenames = keys %{$block_hash->{'all_files'}};
202 @filenames = shuffle(@filenames);
203 foreach my $filename (@filenames)
204 {
205 my $full_filename = &util::filename_cat($importdir,$filename);
206 if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
207 {
208 print $filelist "$filename\n";
209 }
210 }
211 close ($filelist);
212 }
213 my $filelist_end_time = [&gettimeofday()];
214 $filelist_duration = tv_interval($filelist_start_time, $filelist_end_time);
215
216 # Determine if we've been provided a mpi.conf file to indicate the other
217 # machines (slave nodes) this parallizable process should run on
218 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
219 my $mpi_flags = '';
220 if (-f $mpi_conf_path)
221 {
222 print STDERR " ***** CLUSTER MODE *****\n";
223 $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
224 $mpi_flags .= '-nolocal ';
225 }
226 else
227 {
228 print STDERR " ***** SINGLE COMPUTER MODE *****\n";
229 }
230 $mpi_flags .= ' --show-progress --timestamp-output --verbose';
231 # fix for mpi binding to incorrect interface device (seems to have an
232 # unhealthy obsession with virbr0)
233 $mpi_flags .= ' --mca btl tcp,sm,self --mca btl_tcp_if_include eth0 ';
234
235 # invoke the farmer to start processing the files
236 my $gsdlhome;
237 if (defined $site && $site ne '')
238 {
239 $gsdlhome = $ENV{'GSDL3HOME'};
240 }
241 else
242 {
243 $site = "";
244 $gsdlhome = $ENV{'GSDLHOME'};
245 }
246
247 # commands now assume path is correct to find this executables (as they
248 # will be under the new extensions framework)
249 my $farmer_exe = $gsdlhome . '/ext/parallel-building/' . $ENV{'GSDLOS'} . '/bin/mpiimport';
250
251 my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
252 print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
253
254 open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
255 while ( defined( my $line = <MPI> ) )
256 {
257 chomp($line);
258 print "$line\n";
259 }
260 close(MPI);
261}
262# _farmOutProcesses()
263
264
265## @function getSupportedArguments()
266#
267# Retrieve the list of arguments that are specific to this subclass of inexport
268# so they can be added to the list of supported arguments to import.pl. The use
269# of any of these arguments automatically causes this subclass to be
270# instantiated and used in preference to the parent class. ATM it is up to the
271# implementer to ensure these arguments are unique.
272#
273sub getSupportedArguments
274{
275 return $arguments;
276}
277## getSupportedArguments() ##
278
279
280################################################################################
281##### Overrides
282################################################################################
283
284
285## @function perform_process_files()
286#
287sub perform_process_files
288{
289 my $self = shift(@_);
290 my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
291 my $batchsize = $self->{'batchsize'};
292 my $collection = $self->{'collection'};
293 my $site = $self->{'site'};
294 my $workers = $self->{'workers'};
295
296 # Parallel Import
297 if ($workers > 0)
298 {
299 # Call the function written by HS
300 $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
301 }
302 # Serial import
303 else
304 {
305 $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
306 }
307
308 # the individual parts into one single database
309 my $infodb_type = $self->{'collectcfg'}->{'infodbtype'};
310 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
311 if ( -f $mpi_conf_path && &dbutil::supportsMerge($infodb_type) )
312 {
313 print STDERR ' * Merging ' . $infodb_type . ' databases... ';
314 my @databases = ('archiveinf-src','archiveinf-doc');
315 foreach my $database (@databases)
316 {
317 # generate the path to the target database without any hostname suffix
318 my $archive_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, '');
319 #rint STDERR " - merging to: " . $archive_db_path . "\n";
320 open(MPIHOSTS, '<:utf8', $mpi_conf_path) or die("Failed to open mpi.conf for reading");
321 my $line;
322 while ($line = <MPIHOSTS>)
323 {
324 if ($line =~ /^([a-z0-9\-]+)/i)
325 {
326 my $hostname = $1;
327 ###rint STDERR " - searching for database for: " . $hostname . "\n";
328 my $mergable_db_path = &dbutil::get_infodb_file_path($infodb_type, $database, $self->{'archivedir'}, 0, $hostname);
329 ###rint STDERR "[debug] " . $mergable_db_path . "\n";
330 if (-f $mergable_db_path)
331 {
332 ###rint STDERR " - merge $mergable_db_path > $archive_db_path\n";
333 &dbutil::merge_databases($infodb_type, $mergable_db_path, $archive_db_path);
334 }
335 }
336 }
337 close(MPIHOSTS);
338 }
339 print "Done!\n";
340 }
341
342}
343## perform_process_files() ##
344
345## @function generate_statistics()
346#
347# Write out import stats - only output statistics if there are multiple jobs
348# @author hs, 1 july 2010
349#
350sub generate_statistics
351{
352 my $self = shift @_;
353 my ($pluginfo) = @_;
354
355 my $inexport_mode = $self->{'mode'};
356
357 my $statsfile = $self->{'statsfile'};
358 my $out = $self->{'out'};
359 my $faillogname = $self->{'faillogname'};
360 my $gli = $self->{'gli'};
361
362 my $close_stats = 0;
363 # Child processes should have been sent to file, only the parent process
364 # should be writing to STD*.
365 if ($statsfile !~ /^(STDERR|STDOUT)$/i)
366 {
367 if (open (STATS, ">$statsfile"))
368 {
369 $statsfile = 'inexport::STATS';
370 $close_stats = 1;
371 }
372 else
373 {
374 &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
375 &gsprintf($out, "{import.stats_backup}\n");
376 $statsfile = 'STDERR';
377 }
378 }
379 # Master thread. In the future I should do something smarter here, like read
380 # in the stats from all the worker threads stat files, add them up, then
381 # create a dummy plugin object and add it to pluginfo. Otherwise the results
382 # always show 0 documents considered and 0 included...
383 else
384 {
385
386 }
387
388 &gsprintf($out, "\n");
389 &gsprintf($out, "*********************************************\n");
390 &gsprintf($out, "{$inexport_mode.complete}\n");
391 &gsprintf($out, "*********************************************\n");
392
393 # ... but for now just comment this out
394 #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
395 &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
396
397 if ($close_stats)
398 {
399 close STATS;
400 }
401}
402## generate_statistics() ##
403
404
405## @function makeHeader($msg, [$length])
406#
407# Create a centered header string given a certain message padded with '=' characters.
408#
409# @param $msg The message to center as a string
410# @param $length The desired length of string - defaults to 80
411# @return A string centered with '=' as padding
412#
413sub makeHeader
414{
415 my ($msg, $length) = @_;
416 if (!defined $length)
417 {
418 $length = 80;
419 }
420 my $filler_length = ($length - 2 - length($msg)) / 2;
421 my $filler = '=' x $filler_length;
422 if (length($msg) % 2 == 1)
423 {
424 $msg = $filler . ' ' . $msg . ' =' . $filler;
425 }
426 else
427 {
428 $msg = $filler . ' ' . $msg . ' ' . $filler;
429 }
430 return $msg;
431}
432## makeHeader() ##
4331;
Note: See TracBrowser for help on using the repository browser.