source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27594

Last change on this file since 27594 was 27594, checked in by jmt12, 11 years ago

Extend hadoop_import.pl to be able to start and stop the Thrift server(s)

  • Property svn:executable set to *
File size: 12.0 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 1;
22my $debug = 0;
23my $dry_run = 0;
24my $gsdl_home = $ENV{'GSDLHOME'};
25my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
26my $hadoop_exe = 'hadoop'; # you may add path
27my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
28my $hdfs_fs_prefix = 'HDThriftFS://';
29my $refresh_import = 0;
30my $username = `whoami`;
31chomp($username);
32
33`rocks > /dev/null 2>&1`;
34my $is_rocks_cluster = ($? == 0);
35
36# 1. Read and validate parameters
37if (defined $ARGV[0])
38{
39 $collection = $ARGV[0];
40}
41else
42{
43 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] \n\n";
44}
45my $offset = 1;
46while (defined $ARGV[$offset])
47{
48 if ($ARGV[$offset] eq '-debug')
49 {
50 $debug = 1;
51 }
52 if ($ARGV[$offset] eq '-disable_thrift')
53 {
54 $use_thrift = 0;
55 }
56 if ($ARGV[$offset] eq '-dry_run')
57 {
58 $dry_run = 1;
59 }
60 if ($ARGV[$offset] eq '-refresh_import')
61 {
62 $refresh_import = 1;
63 }
64 $offset++;
65}
66
67if (!$use_thrift)
68{
69 $hdfs_fs_prefix = 'HDFSShell://';
70}
71
72my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
73my $gs_import_dir = $gs_collection_dir . '/import';
74if (!-d $gs_import_dir)
75{
76 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
77}
78my $gs_results_dir = $gs_collection_dir . '/results';
79if (!-d $gs_results_dir)
80{
81 mkdir($gs_results_dir, 0755);
82}
83$gs_results_dir .= '/' . time();
84if (!-d $gs_results_dir)
85{
86 mkdir($gs_results_dir, 0755);
87}
88# - directories within HDFS
89#my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
90my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
91#my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
92my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
93
94# 2. Copy the import directory into HDFS
95print " * Replicating import directory in HDFS...";
96# - check if import directory already exists
97my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
98if ($refresh_import || !$hdfs_import_exists)
99{
100 # - clear out the old import directory
101 if ($hdfs_import_exists)
102 {
103 &hdfsCommand('rmr', $hdfs_input_dir);
104 }
105 # - now recursively copy the contents of import directory into HDFS ensuring
106 # that relative paths are maintained
107 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
108 &debugPrint($file_count . " files 'putted'");
109 print "Done!\n";
110}
111else
112{
113 print "Already exists!\n";
114}
115# - clear out the archives regardless
116my $gs_archives_dir = $gs_collection_dir . '/archives';
117my $deleted_archives = 0;
118if (-e $gs_archives_dir)
119{
120 print " * Clearing existing archives directory for this collection... ";
121 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
122 $deleted_archives = 1;
123}
124mkdir($gs_archives_dir, 0755);
125if (&hdfsTest('d', 0, $hdfs_output_dir))
126{
127 if (!$deleted_archives)
128 {
129 print " * Clearing existing archives directory for this collection... ";
130 }
131 &hdfsCommand('rmr', $hdfs_output_dir);
132 $deleted_archives = 1;
133}
134if ($deleted_archives)
135{
136 print "Done!\n";
137}
138# - watch for cached directories for Media based collections
139my $gs_cached_dir = $gs_collection_dir . '/cached';
140if (-e $gs_cached_dir)
141{
142 print " * Clearing existing cached media directory for this collection... ";
143 &shellCommand('rm -rf "' . $gs_cached_dir . '"');
144 print "Done!\n";
145}
146
147# - clear out any old logs
148print " * Clearing existing logs for this collection... ";
149my $gs_logs_dir = $gs_collection_dir . '/logs';
150if (!&dirIsEmpty($gs_logs_dir))
151{
152 &shellCommand('rm -f ' . $gs_logs_dir . '/*.*');
153}
154if (!&dirIsEmpty('/tmp/greenstone'))
155{
156 &shellCommand('rm -f /tmp/greenstone/*.*');
157 &shellCommand('rm -rf /tmp/gsimport*');
158}
159if ($is_rocks_cluster)
160{
161 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
162 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
163}
164print "Done!\n";
165
166# - flush DNS cache too, so we are playing on a level field
167print " * Flushing disk cache... ";
168&shellCommand('flush_caches.pl');
169if ($is_rocks_cluster)
170{
171 &shellCommand('rocks run host "flush_caches.pl"');
172}
173print "Done!\n";
174
175# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
176# where we start the server now to ensure it lives on the head node
177my $server_host = '';
178my $server_port = '';
179my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
180my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
181my $server_prefix = '';
182if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
183{
184 $server_prefix = uc($1);
185 print " * Starting " . $server_prefix . "Server... ";
186 # - start the server on the head node and retrieve the host and port from
187 # the output
188 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
189 my $launcher_output = &shellCommand($launcher_command);
190 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
191 {
192 $server_host = $1;
193 $server_port = $2;
194 print "running on " . $server_host . ":" . $server_port . "\n";
195 }
196 else
197 {
198 print "Failed!\n";
199 exit;
200 }
201 # - use the client tool to add ourselves as a listener
202 print " * Registering as listener... ";
203 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
204 &shellCommand($client_command);
205 print "Done!\n";
206}
207else
208{
209 print "Error! True Hadoop processing is only available when Greenstone is\n";
210 print " configured to use either GDBMServer or TDBServer.\n";
211 exit;
212}
213
214# 3.5 Start up the thrift server(s) if we've been asked to
215if ($start_thrift)
216{
217 if ($is_rocks_cluster)
218 {
219 print " * Starting Thrift Servers (on compute nodes)... ";
220 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"');
221 }
222 # single server
223 else
224 {
225 print " * Starting Thrift Server... ";
226 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
227 }
228 print "Done!\n";
229}
230
231# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
232# and allow the FileInputFormat to split it up into files to be processed
233# in Greenstone. This works for collections with one file per document, like
234# Lorem and ReplayMe, but might not work well with multiple file documents
235# such as the Demo collection
236print " * Running import using Hadoop...";
237my $hadoop_log = $gs_results_dir . '/hadoop.log';
238&shellCommand('echo "host:$HDFSHOST" > ' . $hadoop_log);
239my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' . $hdfs_input_dir . '" "' . $hdfs_output_dir . '" >> ' . $hadoop_log . ' 2>&1';
240&shellCommand($hadoop_command);
241print "Done!\n";
242
243# 5. If we ran *Server infodbs, we now need to shut them down
244if ($server_prefix ne '')
245{
246 print " * Deregistering as listener and shutting down... ";
247 # - deregister as a listener
248 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
249 &shellCommand($client_command1);
250 # - send quit command
251 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
252 &shellCommand($client_command2);
253 print "Done!\n";
254}
255
256# 5.5 We started them - so we better stop the thrift servers too
257# 3.5 Start up the thrift server(s) if we've been asked to
258if ($start_thrift)
259{
260 if ($is_rocks_cluster)
261 {
262 print " * Stopping Thrift Servers (on compute nodes)... ";
263 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop"');
264 }
265 # single server
266 else
267 {
268 print " * Stoping Thrift Server... ";
269 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
270 }
271 print "Done!\n";
272}
273
274# 6. Gather logs
275print " * Gathering logs from compute nodes... ";
276# - local files
277if (!&dirIsEmpty('/tmp/greenstone'))
278{
279 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
280}
281if (-d $gs_collection_dir . '/logs')
282{
283 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
284}
285# - remote files
286if ($is_rocks_cluster)
287{
288 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
289&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
290}
291print "Done!\n";
292# - generate data locality report
293&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
294
295# 7. Done - clean up
296print " * Cleaning up temporary files... ";
297&shellCommand('rm -rf /tmp/greenstone');
298&shellCommand('rm -rf /tmp/gsimport*');
299if ($is_rocks_cluster)
300{
301 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
302 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
303}
304print "Done!\n";
305print "Complete!\n\n";
306
307exit;
308
309# /** @function debugPrint
310# */
311sub debugPrint
312{
313 my $msg = shift(@_);
314 if ($debug)
315 {
316 print "[Debug] " . $msg . "\n";
317 }
318}
319# /** debugPrint() **/
320
321# /** @function hdfsCommand
322# */
323sub hdfsCommand
324{
325 my $command = shift(@_);
326 my $paths = '"' . join('" "', @_) . '"';
327 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
328 &shellCommand($hdfs_command);
329 return $?;
330}
331# /** hdfsCommand() **/
332
333# /** @function hdfsTest
334# */
335sub hdfsTest
336{
337 my $command = shift(@_);
338 my $test_target = shift(@_);
339 my $result = &hdfsCommand('test -' . $command, @_);
340 return ($result == $test_target);
341}
342# /** hdfsTest() **/
343
344# /**
345# */
346sub printUsage
347{
348 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
349 exit;
350}
351# /** printUsage() **/
352
353# /**
354# */
355sub recursiveCopy
356{
357 my ($src_dir, $hdfs_dir) = @_;
358 my $file_count = 0;
359 # - create the directory in HDFS
360 &hdfsCommand('mkdir', $hdfs_dir);
361 # - search $src_dir for files
362 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
363 my @files = readdir(DH);
364 closedir(DH);
365 foreach my $file (@files)
366 {
367 # - skip dot prefix files
368 if ($file !~ /^\./)
369 {
370 my $src_path = $src_dir . '/' . $file;
371 # - recurse directories, remembering to extend HDFS dir too
372 if (-d $src_path)
373 {
374 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
375 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
376 }
377 # - and use 'put' to copy files
378 else
379 {
380 my $hdfs_path = $hdfs_dir . '/' . $file;
381 &hdfsCommand('put', $src_path, $hdfs_path);
382 $file_count++;
383 }
384 }
385 }
386 return $file_count;
387}
388# /** recursiveCopy() **/
389
390# /** @function shellCommand
391# */
392sub shellCommand
393{
394 my $cmd = shift(@_);
395 my $output = '';
396 &debugPrint($cmd);
397 if (!$dry_run)
398 {
399 $output = `$cmd`;
400 }
401 return $output;
402}
403# /** shellCommand() **/
404
405# /** @function urlCat
406# */
407sub urlCat
408{
409 my $url = join('/', @_);
410 return $url;
411}
412# /** urlCat() **/
413
414# /**
415# */
416sub dirIsEmpty
417{
418 my $dir = shift(@_);
419 my @files;
420 if (-e $dir)
421 {
422 opendir(DIR, $dir) or die $!;
423 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
424 closedir(DIR);
425 }
426 @files ? 0 : 1;
427}
428# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.