source: gs2-extensions/parallel-building/trunk/src/perllib/parallelbuildinginexport.pm@ 28654

Last change on this file since 28654 was 28654, checked in by jmt12, 10 years ago

Removed recordEarliestDatestamp() function as that no lurks in the dynamically loaded infodb drivers (this is to allow the xmlstdout infodb driver needed for full Hadoop support)

File size: 10.1 KB
RevLine 
[27279]1###############################################################################
2#
3# parallelbuildinginexport.pm -- support parallel import.pl by extending
4# inexport with parallel processing awareness and functionality
5#
6# A component of the Greenstone digital library software from the New Zealand
7# Digital Library Project at the University of Waikato, New Zealand.
8#
9# Copyright (C) 2013 New Zealand Digital Library Project
10#
11# This program is free software; you can redistribute it and/or modify it under
12# the terms of the GNU General Public License as published by the Free Software
13# Foundation; either version 2 of the License, or (at your option) any later
14# version.
15#
16# This program is distributed in the hope that it will be useful, but WITHOUT
17# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
18# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
19# details.
20#
21# You should have received a copy of the GNU General Public License along with
22# this program; if not, write to the Free Software Foundation, Inc., 675 Mass
23# Ave, Cambridge, MA 02139, USA.
24#
25###############################################################################
26
27# @author Hussein Suleman [h1], University of Cape Town, South Africa
28# @author John Thompson [jmt12], Greenstone DL Research Group
29
30package parallelbuildinginexport;
31
[28001]32# Pragma
[27279]33use strict;
[28001]34no strict 'refs'; # allow filehandles to be variables and vice versa
35no strict 'subs'; # allow barewords (eg STDERR) as function arguments
[27279]36
37# MODULES
38# Randomize the order of files in the filelist
39use List::Util qw( shuffle );
40
41# Greenstone Modules
42use gsprintf 'gsprintf';
43use inexport;
44
45BEGIN
46{
47 @parallelbuildinginexport::ISA = ('inexport');
48}
49
50# jobs and epoch added for parallel processing [hs, 1 july 2010]
51# added aliases 'workers' and 'batchsize' [jmt12]
52my $arguments = [
53 { 'name' => "workers",
54 'desc' => "**Parallel Processing** The number of 'worker' threads to spawn when parallel processing",
55 'type' => "int",
56 'range' => "0,",
57 'reqd' => "no",
58 'hiddengli' => "yes" },
59 { 'name' => "batchsize",
60 'desc' => "**Parallel Processing** The number of documents in each 'batch' allocated to a worker thread for processing",
61 'type' => "int",
62 'range' => "1,",
63 'reqd' => "no",
64 'hiddengli' => "yes" },
65 { 'name' => "nolocal",
66 'desc' => "**Parallel Processing** Do not run any worker threads on the compute node running this import - for use on cluster or other multiple machine imports only",
67 'type' => "flag",
68 'reqd' => "no",
69 'hiddengli' => "yes" },
70 { 'name' => "jobs",
71 'desc' => "**Parallel Processing** Legacy support - see 'workers' above where workers equals jobs - 1",
72 'type' => "int",
73 'range' => "1,",
74 'reqd' => "no",
75 'internal' => "1",
76 'hiddengli' => "yes" },
77 { 'name' => "epoch",
78 'desc' => "**Parallel Processing** Legacy support - see 'batchsize' above",
79 'type' => "int",
80 'range' => "1,",
81 'reqd' => "no",
82 'internal' => "1",
83 'hiddengli' => "yes" },
84 ];
85
86# @function new()
87# Constructor
88#
89sub new
90{
91 my $class = shift(@_);
92 my $self = new inexport(@_);
93
94 # Legacy support
95 if ($self->{'workers'} eq '' && $self->{'jobs'} ne '')
96 {
97 $self->{'workers'} = $self->{'jobs'} - 1;
98 }
99 if ($self->{'batchsize'} eq '' && $self->{'epoch'} ne '')
100 {
101 $self->{'batchsize'} = $self->{'epoch'};
102 }
103
104 # Sanity Check
105 if ($self->{'batchsize'} !~ /^\d+$/)
106 {
107 print STDERR "WARNING: batchsize missing or not a number - assuming batchsize = 1\n";
108 $self->{'batchsize'} = 1;
109 }
110 if ($self->{'workers'} !~ /^\d+$/ || $self->{'workers'} < 1)
111 {
112 print STDERR "WARNING: parallel processing not available with fewer than one worker - assuming serial import\n";
113 $self->{'workers'} = 0;
114 }
115 else
116 {
117 print "Performing Parallel Import: workers:" . $self->{'workers'} . ", batchsize:" . $self->{'batchsize'} . "\n";
118 if (!$self->{'removeold'})
119 {
120 print "WARNING: Parallel Processing requires -removeold. Current contents of archives directory will be deleted.\n";
121 sleep(3); #just in case
122 $self->{'removeold'} = 1;
123 }
124 }
125
126 return bless($self, $class);
127}
128# new()
129
130# @function _farmOutProcesses()
131# Index the files in parallel using MPI farmer to farm off multiple processes
132# @author hs, 1 july 2010
133#
134sub _farmOutProcesses
135{
136 my $self = shift(@_);
137 my ($workers, $batchsize, $importdir, $block_hash, $collection, $site) = @_;
138
139 my $tmp_dir_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'tmp');
140 if (!-d $tmp_dir_path)
141 {
142 mkdir($tmp_dir_path, 0777);
143 }
144
145 # create the list of files to import
146 my $overwrite = 1;
147 my $tmp_filelist = &util::filename_cat($tmp_dir_path, "filelist.txt");
148 # - if the file is already there (which is should be during testing) then
149 # don't regenerate. This is especially important for imports of 1 million
150 # documents as just the directory scan can take several hours.
151 if ($overwrite || !-f $tmp_filelist)
152 {
153 open (my $filelist, ">$tmp_filelist");
154 my @filenames = keys %{$block_hash->{'all_files'}};
155 @filenames = shuffle(@filenames);
156 foreach my $filename (@filenames)
157 {
158 my $full_filename = &util::filename_cat($importdir,$filename);
159 if ((! exists $block_hash->{'file_blocks'}->{$full_filename}) && ($filename !~ m/metadata\.xml$/))
160 {
161 print $filelist "$filename\n";
162 }
163 }
164 close ($filelist);
165 }
166
167 # Determine if we've been provided a mpi.conf file to indicate the other
168 # machines (slave nodes) this parallizable process should run on
169 my $mpi_conf_path = &util::filename_cat($ENV{'GSDLCOLLECTDIR'}, 'mpi.conf');
170 my $mpi_flags = '';
171 if (-f $mpi_conf_path)
172 {
173 print STDERR " ***** CLUSTER MODE *****\n";
174 $mpi_flags .= '-machinefile "' . $mpi_conf_path . '" ';
175 #$mpi_flags .= '-nolocal ';
176 }
177 else
178 {
179 print STDERR " ***** SINGLE COMPUTER MODE *****\n";
180 }
181 $mpi_flags .= ' --show-progress --timestamp-output --verbose';
182
183 # invoke the farmer to start processing the files
184 my $gsdlhome;
185 if (defined $site && $site ne '')
186 {
187 $gsdlhome = $ENV{'GSDL3HOME'};
188 }
189 else
190 {
191 $site = "";
192 $gsdlhome = $ENV{'GSDLHOME'};
193 }
194
195 # commands now assume path is correct to find this executables (as they
196 # will be under the new extensions framework)
197 my $farmer_exe = 'mpiimport';
198
199 my $mpi_cmd = 'mpirun ' . $mpi_flags . ' -n ' . ($workers + 1) . ' ' . $farmer_exe . ' ' . $tmp_filelist . ' ' . $batchsize . ' ' . $gsdlhome . ' ' . $collection . ' ' . $site;
200 print STDERR "MPI Command: \"" . $mpi_cmd . "\"\n";
201
202 open(MPI, $mpi_cmd . " |") or die("Couldn't Execute MPI");
203 while ( defined( my $line = <MPI> ) )
204 {
205 chomp($line);
206 print "$line\n";
207 }
208 close(MPI);
209}
210# _farmOutProcesses()
211
212# @function getSupportedArguments
213# Retrieve the list of arguments that are specific to this subclass of inexport
214# so they can be added to the list of supported arguments to import.pl. The
215# use of any of these arguments automatically causes this subclass to be
216# instantiated and used in preference to the parent class. ATM it is up to the
217# implementer to ensure these arguments are unique.
218sub getSupportedArguments
219{
220 return $arguments;
221}
222# getSupportedArguments()
223
224################################################################################
225##### Overrides
226################################################################################
227
[27995]228
229## @function perform_process_files()
[27279]230#
231sub perform_process_files
232{
233 my $self = shift(@_);
234 my ($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs) = @_;
235 my $batchsize = $self->{'batchsize'};
236 my $collection = $self->{'collection'};
237 my $site = $self->{'site'};
238 my $workers = $self->{'workers'};
239
240 # Parallel Import
241 if ($workers > 0)
242 {
243 # Call the function written by HS
244 $self->_farmOutProcesses($workers, $batchsize, $importdir, $block_hash, $collection, $site);
245 }
246 # Serial import
247 else
248 {
249 $self->SUPER::perform_process_files($manifest, $pluginfo, $importdir, $file_to_import, $block_hash, $metadata, $processor, $maxdocs);
250 }
251}
[27995]252## perform_process_files() ##
[27279]253
[27995]254## @function generate_statistics()
255#
256# Write out import stats - only output statistics if there are multiple jobs
[27279]257# @author hs, 1 july 2010
[27995]258#
[27279]259sub generate_statistics
260{
261 my $self = shift @_;
262 my ($pluginfo) = @_;
263
264 my $inexport_mode = $self->{'mode'};
265
266 my $statsfile = $self->{'statsfile'};
267 my $out = $self->{'out'};
268 my $faillogname = $self->{'faillogname'};
269 my $gli = $self->{'gli'};
270
271 my $close_stats = 0;
272 # Child processes should have been sent to file, only the parent process
273 # should be writing to STD*.
274 if ($statsfile !~ /^(STDERR|STDOUT)$/i)
275 {
276 if (open (STATS, ">$statsfile"))
277 {
278 $statsfile = 'inexport::STATS';
279 $close_stats = 1;
280 }
281 else
282 {
283 &gsprintf($out, "{import.cannot_open_stats_file}", $statsfile);
284 &gsprintf($out, "{import.stats_backup}\n");
285 $statsfile = 'STDERR';
286 }
287 }
288 # Master thread. In the future I should do something smarter here, like read
289 # in the stats from all the worker threads stat files, add them up, then
290 # create a dummy plugin object and add it to pluginfo. Otherwise the results
291 # always show 0 documents considered and 0 included...
292 else
293 {
294
295 }
296
297 &gsprintf($out, "\n");
298 &gsprintf($out, "*********************************************\n");
299 &gsprintf($out, "{$inexport_mode.complete}\n");
300 &gsprintf($out, "*********************************************\n");
301
302 # ... but for now just comment this out
303 #&plugin::write_stats($pluginfo, $statsfile, $faillogname, $gli);
304 &gsprintf($out, "* Review gsimport-W*.log files for any warnings and errors\n");
305
306 if ($close_stats)
307 {
308 close STATS;
309 }
310}
[27995]311## generate_statistics() ##
[27279]312
[28001]313
[27279]3141;
Note: See TracBrowser for help on using the repository browser.