source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27001

Last change on this file since 27001 was 27001, checked in by jmt12, 11 years ago

Passing more environment variables (HADOOPPREFIX, HDFSHOST, HDFSPORT) through to hadoop (and thus on to compute nodes). Debug comments. Directory not empty test before attempting to copy log files preventing non-fatal errors. Whether on cluster determined by presence of 'rocks' executable rather than hardcoded hostname (one less thing for me to forget to change between computers)

  • Property svn:executable set to *
File size: 8.7 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $debug = 0;
19my $dry_run = 0;
20my $gsdl_home = $ENV{'GSDLHOME'};
21my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
22my $hadoop_exe = 'hadoop'; # you may add path
23my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
24my $hdfs_fs_prefix = 'hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'};
25my $refresh_import = 0;
26my $username = `whoami`;
27chomp($username);
28
29`rocks > /dev/null 2>&1`;
30my $is_rocks_cluster = ($? == 0);
31
32# 1. Read and validate parameters
33if (defined $ARGV[0])
34{
35 $collection = $ARGV[0];
36}
37my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
38my $gs_import_dir = $gs_collection_dir . '/import';
39if (!-d $gs_import_dir)
40{
41 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
42}
43my $gs_results_dir = $gs_collection_dir . '/results';
44if (!-d $gs_results_dir)
45{
46 mkdir($gs_results_dir, 0755);
47}
48$gs_results_dir .= '/' . time();
49if (!-d $gs_results_dir)
50{
51 mkdir($gs_results_dir, 0755);
52}
53my $gs_archives_dir = $gs_collection_dir . '/archives';
54# - directories within HDFS
55my $hdfs_input_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'import');
56my $hdfs_output_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
57
58# 2. Copy the import directory into HDFS
59print " * Replicating import directory in HDFS...";
60# - check if import directory already exists
61my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
62if ($refresh_import || !$hdfs_import_exists)
63{
64 # - clear out the old import directory
65 if ($hdfs_import_exists)
66 {
67 &hdfsCommand('rmr', $hdfs_input_dir);
68 }
69 # - now recursively copy the contents of import directory into HDFS ensuring
70 # that relative paths are maintained
71 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
72 &debugPrint($file_count . " files 'putted'");
73 print "Done!\n";
74}
75else
76{
77 print "Already exists!\n";
78}
79# - clear out the archives regardless
80if (-e $gs_archives_dir)
81{
82 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
83}
84mkdir($gs_archives_dir, 0755);
85if (&hdfsTest('d', 0, $hdfs_output_dir))
86{
87 print " * Clearing existing archives directory for this collection... ";
88 &hdfsCommand('rmr', $hdfs_output_dir);
89 print "Done!\n";
90}
91# - clear out any old logs
92if (!&dirIsEmpty('/tmp/greenstone'))
93{
94 &shellCommand('rm /tmp/greenstone/*.*');
95}
96if ($is_rocks_cluster)
97{
98 &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
99}
100
101# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
102# where we start the server now to ensure it lives on the head node
103my $server_host = '';
104my $server_port = '';
105my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
106my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
107my $server_prefix = '';
108if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
109{
110 $server_prefix = uc($1);
111 print " * Starting " . $server_prefix . "Server... ";
112 # - start the server on the head node and retrieve the host and port from
113 # the output
114 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
115 my $launcher_output = &shellCommand($launcher_command);
116 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
117 {
118 $server_host = $1;
119 $server_port = $2;
120 print "running on " . $server_host . ":" . $server_port . "\n";
121 }
122 else
123 {
124 print "Failed!\n";
125 exit;
126 }
127 # - use the client tool to add ourselves as a listener
128 print " * Registering as listener... ";
129 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
130 &shellCommand($client_command);
131 print "Done!\n";
132}
133else
134{
135 print "Error! True Hadoop processing is only available when Greenstone is\n";
136 print " configured to use either GDBMServer or TDBServer.\n";
137 exit;
138}
139
140# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
141# and allow the FileInputFormat to split it up into files to be processed
142# in Greenstone. This works for collections with one file per document, like
143# Lorem and ReplayMe, but might not work well with multiple file documents
144# such as the Demo collection
145print " * Running import using Hadoop...";
146my $hadoop_log = $gs_results_dir . '/hadoop.log';
147my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $ENV{'HDFSHOST'} . '" ' . $ENV{'HDFSPORT'} . ' "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . " " . $hdfs_input_dir . " " . $hdfs_output_dir . " > " . $hadoop_log . " 2>&1";
148&shellCommand($hadoop_command);
149print "Done!\n";
150
151# 5. If we ran *Server infodbs, we now need to shut them down
152if ($server_prefix ne '')
153{
154 print " * Deregistering as listener and shutting down... ";
155 # - deregister as a listener
156 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
157 &shellCommand($client_command1);
158 # - send quit command
159 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
160 &shellCommand($client_command2);
161 print "Done!\n";
162}
163
164
165# 6. Gather logs
166print " * Gathering logs from compute nodes... ";
167# - local files
168if (!&dirIsEmpty('/tmp/greenstone'))
169{
170 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
171}
172if (-d $gs_collection_dir . '/logs')
173{
174 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
175}
176# - remote files
177if ($is_rocks_cluster)
178{
179 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
180&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
181}
182print "Done!\n";
183
184# 7. Done - clean up
185print " * Cleaning up temporary files... ";
186&shellCommand('rm -rf /tmp/greenstone');
187if ($is_rocks_cluster)
188{
189 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
190}
191print "Done!\n";
192print "Complete!\n\n";
193
194exit;
195
196# /** @function debugPrint
197# */
198sub debugPrint
199{
200 my $msg = shift(@_);
201 if ($debug)
202 {
203 print "[Debug] " . $msg . "\n";
204 }
205}
206# /** debugPrint() **/
207
208# /** @function hdfsCommand
209# */
210sub hdfsCommand
211{
212 my $command = shift(@_);
213 my $paths = '"' . join('" "', @_) . '"';
214 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
215 &shellCommand($hdfs_command);
216 return $?;
217}
218# /** hdfsCommand() **/
219
220# /** @function hdfsTest
221# */
222sub hdfsTest
223{
224 my $command = shift(@_);
225 my $test_target = shift(@_);
226 my $result = &hdfsCommand('test -' . $command, @_);
227 return ($result == $test_target);
228}
229# /** hdfsTest() **/
230
231# /**
232# */
233sub printUsage
234{
235 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
236 exit;
237}
238# /** printUsage() **/
239
240# /**
241# */
242sub recursiveCopy
243{
244 my ($src_dir, $hdfs_dir) = @_;
245 my $file_count = 0;
246 # - create the directory in HDFS
247 &hdfsCommand('mkdir', $hdfs_dir);
248 # - search $src_dir for files
249 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
250 my @files = readdir(DH);
251 closedir(DH);
252 foreach my $file (@files)
253 {
254 # - skip dot prefix files
255 if ($file !~ /^\./)
256 {
257 my $src_path = $src_dir . '/' . $file;
258 # - recurse directories, remembering to extend HDFS dir too
259 if (-d $src_path)
260 {
261 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
262 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
263 }
264 # - and use 'put' to copy files
265 else
266 {
267 my $hdfs_path = $hdfs_dir . '/' . $file;
268 &hdfsCommand('put', $src_path, $hdfs_path);
269 $file_count++;
270 }
271 }
272 }
273 return $file_count;
274}
275# /** recursiveCopy() **/
276
277# /** @function shellCommand
278# */
279sub shellCommand
280{
281 my $cmd = shift(@_);
282 my $output = '';
283 &debugPrint($cmd);
284 if (!$dry_run)
285 {
286 $output = `$cmd`;
287 }
288 return $output;
289}
290# /** shellCommand() **/
291
292# /** @function urlCat
293# */
294sub urlCat
295{
296 my $url = join('/', @_);
297 return $url;
298}
299# /** urlCat() **/
300
301# /**
302# */
303sub dirIsEmpty
304{
305 my $dir = shift(@_);
306 my @files;
307 if (-e $dir)
308 {
309 opendir(DIR, $dir) or die $!;
310 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
311 closedir(DIR);
312 }
313 @files ? 0 : 1;
314}
315# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.