source: gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm@ 27279

Last change on this file since 27279 was 27279, checked in by jmt12, 11 years ago

A subclass of inexport.pm with added functionality for parallel processing using OpenMPI

File size: 9.9 KB
Line 
1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
32use strict;
33
34# MODULES
35# Randomize the order of files in the filelist
36use List::Util qw( shuffle );
37
38# Greenstone Modules
39use gsprintf 'gsprintf';
40use inexport;
41
42BEGIN
43{
44 @parallelbuildinginexport::ISA = ('inexport');
45}
46
47# jobs and epoch added for parallel processing [hs, 1 july 2010]
48# added aliases 'workers' and 'batchsize' [jmt12]
49my $arguments = [
50 { 'name' => "workers",
51 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
52 'type' => "int",
53 'range' => "0,",
54 'reqd' => "no",
55 'hiddengli' => "yes" },
56 { 'name' => "batchsize",
57 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
58 'type' => "int",
59 'range' => "1,",
60 'reqd' => "no",
61 'hiddengli' => "yes" },
62 { 'name' => "nolocal",
63 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
64 'type' => "flag",
65 'reqd' => "no",
66 'hiddengli' => "yes" },
67 { 'name' => "jobs",
68 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
69 'type' => "int",
70 'range' => "1,",
71 'reqd' => "no",
72 'internal' => "1",
73 'hiddengli' => "yes" },
74 { 'name' => "epoch",
75 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
76 'type' => "int",
77 'range' => "1,",
78 'reqd' => "no",
79 'internal' => "1",
80 'hiddengli' => "yes" },
81 ];
82
83# @function new()
84# Constructor
85#
86sub new
87{
88 my $class = shift(@_);
89 my $self = new inexport(@_);
90
91 # Legacy support
92 if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
93 {
94 $self->{'workers'} = $self->{'jobs'} - 1;
95 }
96 if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
97 {
98 $self->{'batchsize'} = $self->{'epoch'};
99 }
100
101 # Sanity Check
102 if ($self->{'batchsize'} !~ /^\d+$/)
103 {
104 print STDERR "WARNING: batchsize missing or not a number - assuming batchsize = 1\n";
105 $self->{'batchsize'} = 1;
106 }
107 if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
108 {
109 print STDERR "WARNING: parallel processing not available with fewer than one worker - assuming serial import\n";
110 $self->{'workers'} = 0;
111 }
112 else
113 {
114 print "Performing Parallel Import: workers:" . $self->{'workers'} . ", batchsize:" . $self->{'batchsize'} . "\n";
115 if (!$self->{'removeold'})
116 {
117 print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
118 sleep(3); #just in case
119 $self->{'removeold'} = 1;
120 }
121 }
122
123 return bless($self, $class);
124}
125# new()
126
127# @function _farmOutProcesses()
128# Index the files in parallel using MPI farmer to farm off multiple processes
129# @author hs, 1 july 2010
130#
131sub _farmOutProcesses
132{
133 my $self = shift(@_);
134 my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
135
136 my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
137 if (!-d $tmp_dir_path)
138 {
139 mkdir($tmp_dir_path, 0777);
140 }
141
142 # create the list of files to import
143 my $overwrite = 1;
144 my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
145 # - if the file is already there (which is should be during testing) then
146 # don't regenerate. This is especially important for imports of 1 million
147 # documents as just the directory scan can take several hours.
148 if ($overwrite || !-f $tmp_filelist)
149 {
150 open (my $filelist, ">$tmp_filelist");
151 my @filenames = keys %{$block_hash->{'all_files'}};
152 @filenames = shuffle(@filenames);
153 foreach my $filename (@filenames)
154 {
155 my $full_filename = &util::filename_cat($importdir,$filename);
156 if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
157 {
158 print $filelist "$filename\n";
159 }
160 }
161 close ($filelist);
162 }
163
164 # Determine if we've been provided a mpi.conf file to indicate the other
165 # machines (slave nodes) this parallizable process should run on
166 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
167 my $mpi_flags = '';
168 if (-f $mpi_conf_path)
169 {
170 print STDERR " ***** CLUSTER MODE *****\n";
171 $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
172 #$mpi_flags .= '-nolocal ';
173 }
174 else
175 {
176 print STDERR " ***** SINGLE COMPUTER MODE *****\n";
177 }
178 $mpi_flags .= ' --show-progress --timestamp-output --verbose';
179
180 # invoke the farmer to start processing the files
181 my $gsdlhome;
182 if (defined $site && $site ne '')
183 {
184 $gsdlhome = $ENV{'GSDL3HOME'};
185 }
186 else
187 {
188 $site = "";
189 $gsdlhome = $ENV{'GSDLHOME'};
190 }
191
192 # commands now assume path is correct to find this executables (as they
193 # will be under the new extensions framework)
194 my $farmer_exe = 'mpiimport';
195
196 my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
197 print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
198
199 open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
200 while ( defined( my $line = <MPI> ) )
201 {
202 chomp($line);
203 print "$line\n";
204 }
205 close(MPI);
206}
207# _farmOutProcesses()
208
209# @function getSupportedArguments
210# Retrieve the list of arguments that are specific to this subclass of inexport
211# so they can be added to the list of supported arguments to import.pl. The
212# use of any of these arguments automatically causes this subclass to be
213# instantiated and used in preference to the parent class. ATM it is up to the
214# implementer to ensure these arguments are unique.
215sub getSupportedArguments
216{
217 return $arguments;
218}
219# getSupportedArguments()
220
221################################################################################
222##### Overrides
223################################################################################
224
225# @function perform_process_files()
226#
227sub perform_process_files
228{
229 my $self = shift(@_);
230 my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
231 my $batchsize = $self->{'batchsize'};
232 my $collection = $self->{'collection'};
233 my $site = $self->{'site'};
234 my $workers = $self->{'workers'};
235
236 # Parallel Import
237 if ($workers > 0)
238 {
239 # Call the function written by HS
240 $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
241 }
242 # Serial import
243 else
244 {
245 $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
246 }
247}
248# perform_process_files()
249
250# @function generate_statistics()
251# write out import stats - only output statistics if there are multiple jobs
252# @author hs, 1 july 2010
253sub generate_statistics
254{
255 my $self = shift @_;
256 my ($pluginfo) = @_;
257
258 my $inexport_mode = $self->{'mode'};
259
260 my $statsfile = $self->{'statsfile'};
261 my $out = $self->{'out'};
262 my $faillogname = $self->{'faillogname'};
263 my $gli = $self->{'gli'};
264
265 my $close_stats = 0;
266 # Child processes should have been sent to file, only the parent process
267 # should be writing to STD*.
268 if ($statsfile !~ /^(STDERR|STDOUT)$/i)
269 {
270 if (open (STATS, ">$statsfile"))
271 {
272 $statsfile = 'inexport::STATS';
273 $close_stats = 1;
274 }
275 else
276 {
277 &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
278 &gsprintf($out, "{import.stats_backup}\n");
279 $statsfile = 'STDERR';
280 }
281 }
282 # Master thread. In the future I should do something smarter here, like read
283 # in the stats from all the worker threads stat files, add them up, then
284 # create a dummy plugin object and add it to pluginfo. Otherwise the results
285 # always show 0 documents considered and 0 included...
286 else
287 {
288
289 }
290
291 &gsprintf($out, "\n");
292 &gsprintf($out, "*********************************************\n");
293 &gsprintf($out, "{$inexport_mode.complete}\n");
294 &gsprintf($out, "*********************************************\n");
295
296 # ... but for now just comment this out
297 #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
298 &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
299
300 if ($close_stats)
301 {
302 close STATS;
303 }
304}
305# generate_statistics()
306
3071;
Note: See TracBrowser for help on using the repository browser.