source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 28015

Last change on this file since 28015 was 28015, checked in by jmt12, 11 years ago

Add an extra option that allows me to pass in the directory to write log files to, and add extra check when copying logs (by *.*) that the log directory isn't empty

  • Property svn:executable set to *
File size: 15.9 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 0;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $java_library = 'HadoopGreenstoneIngest2';
32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
33my $hdfs_fs_prefix = 'HDThriftFS://';
34my $refresh_import = 0;
35my $username = `whoami`;
36chomp($username);
37my $gs_results_dir = '';
38
39`rocks > /dev/null 2>&1`;
40my $is_rocks_cluster = ($? == 0);
41
42# 1. Read and validate parameters
43print 'Options: ' . join(' ', @ARGV) . "\n";
44if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
45{
46 $collection = $ARGV[0];
47 print ' collection: ' . $collection . "\n";
48}
49else
50{
51 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
52 print STDERR "where: [debug] print more debug messages to STDERR\n";
53 print STDERR " [dry_run] don't actually perform an file actions\n";
54 exit;
55}
56my $offset = 1;
57while (defined $ARGV[$offset])
58{
59 if ($ARGV[$offset] eq '-debug')
60 {
61 $debug = 1;
62 }
63 if ($ARGV[$offset] eq '-disable_thrift')
64 {
65 $use_thrift = 0;
66 }
67 if ($ARGV[$offset] eq '-dry_run')
68 {
69 $dry_run = 1;
70 }
71 if ($ARGV[$offset] eq '-refresh_import')
72 {
73 $refresh_import = 1;
74 }
75 if ($ARGV[$offset] eq '-stagger')
76 {
77 $stagger = 1;
78 }
79 if ($ARGV[$offset] eq '-flush_diskcache')
80 {
81 $flush_diskcache = 1;
82 }
83 if ($ARGV[$offset] eq '-start_thrift')
84 {
85 $start_thrift = 1;
86 }
87 if ($ARGV[$offset] eq '-use_nfs')
88 {
89 $use_nfs = 1;
90 }
91 if ($ARGV[$offset] eq '-logdir')
92 {
93 $offset++;
94 $gs_results_dir = $ARGV[$offset];
95 }
96 $offset++;
97}
98
99if (!$use_thrift)
100{
101 $hdfs_fs_prefix = 'HDFSShell://';
102}
103if ($use_nfs)
104{
105 $hdfs_fs_prefix = '/hdfs';
106}
107
108my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
109my $gs_import_dir = $gs_collection_dir . '/import';
110if (!-d $gs_import_dir)
111{
112 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
113}
114if ($gs_results_dir eq '')
115{
116 $gs_results_dir = $gs_collection_dir . '/results';
117 if (!-d $gs_results_dir)
118 {
119 mkdir($gs_results_dir, 0755);
120 }
121 $gs_results_dir .= '/' . time();
122}
123if (!-d $gs_results_dir)
124{
125 mkdir($gs_results_dir, 0755);
126}
127# - directories within HDFS
128my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
129print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
130my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
131if ($use_nfs)
132{
133 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
134}
135my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
136print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
137my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
138if ($use_nfs)
139{
140 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
141}
142
143# 2. Copy the import directory into HDFS
144print " * Replicating import directory in HDFS...";
145# - check if import directory already exists
146my $hdfs_import_exists = 0;
147if ($use_nfs)
148{
149 if (-d $nfs_input_dir)
150 {
151 $hdfs_import_exists = 1;
152 }
153}
154else
155{
156 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
157}
158if ($refresh_import || !$hdfs_import_exists)
159{
160 # - clear out the old import directory
161 if ($hdfs_import_exists)
162 {
163 if ($use_nfs)
164 {
165 &recursiveDelete($nfs_input_dir, '/hdfs');
166 }
167 else
168 {
169 &hdfsCommand('rmr', $hdfs_input_dir);
170 }
171 }
172 # - now recursively copy the contents of import directory into HDFS ensuring
173 # that relative paths are maintained
174 my $file_count = 0;
175 if ($use_nfs)
176 {
177 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
178 }
179 else
180 {
181 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
182 }
183 &debugPrint($file_count . " files 'putted'");
184 print "Done!\n";
185}
186else
187{
188 print "Already exists!\n";
189}
190
191# - clear out the archives regardless
192my $gs_archives_dir = $gs_collection_dir . '/archives';
193my $deleted_archives = 0;
194if (-e $gs_archives_dir)
195{
196 print " * Clearing existing archives directory for this collection... ";
197 &recursiveDelete($gs_archives_dir, $gsdl_home);
198 $deleted_archives = 1;
199}
200mkdir($gs_archives_dir, 0755);
201my $hdfs_archives_exists = 0;
202if ($use_nfs)
203{
204 if (-d $nfs_output_dir)
205 {
206 $hdfs_archives_exists = 1;
207 }
208}
209else
210{
211 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
212}
213if ($hdfs_archives_exists)
214{
215 if (!$deleted_archives)
216 {
217 print " * Clearing existing archives directory for this collection... ";
218 }
219 if ($use_nfs)
220 {
221 &recursiveDelete($nfs_output_dir, '/hdfs');
222 }
223 else
224 {
225 &hdfsCommand('rmr', $hdfs_output_dir);
226 }
227 $deleted_archives = 1;
228}
229if ($deleted_archives)
230{
231 print "Done!\n";
232}
233
234# - watch for cached directories for Media based collections
235my $gs_cached_dir = $gs_collection_dir . '/cached';
236if (-e $gs_cached_dir)
237{
238 print " * Clearing existing cached media directory for this collection... ";
239 &recursiveDelete($gs_cached_dir, $gsdl_home);
240 print "Done!\n";
241}
242
243# - clear out any old logs
244print " * Clearing existing logs for this collection... ";
245my $gs_logs_dir = $gs_collection_dir . '/logs';
246if (!&dirIsEmpty($gs_logs_dir))
247{
248 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
249}
250if (!&dirIsEmpty('/tmp/greenstone'))
251{
252 &shellCommand('rm -f /tmp/greenstone/*.*');
253 &shellCommand('rm -rf /tmp/gsimport*');
254 &shellCommand('rm -rf /tmp/thrift');
255}
256if ($is_rocks_cluster)
257{
258 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
259 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
260 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
261}
262print "Done!\n";
263
264# - flush DNS cache too, so we are playing on a level field
265if ($flush_diskcache)
266{
267 print " * Flushing disk cache... ";
268 &shellCommand('flush_caches.pl');
269 if ($is_rocks_cluster)
270 {
271 &shellCommand('rocks run host "flush_caches.pl"');
272 }
273 print "Done!\n";
274}
275
276# - If we've been asked to Stagger start-up, add "delay.me" files to the
277# compute nodes
278if ($is_rocks_cluster && $stagger)
279{
280 &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
281}
282
283# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
284# where we start the server now to ensure it lives on the head node
285my $server_host = '';
286my $server_port = '';
287my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
288my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
289my $server_prefix = '';
290if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
291{
292 $server_prefix = uc($1);
293 print " * Starting " . $server_prefix . "Server... ";
294 # - start the server on the head node and retrieve the host and port from
295 # the output
296 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
297 my $launcher_output = &shellCommand($launcher_command);
298 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
299 {
300 $server_host = $1;
301 $server_port = $2;
302 print "running on " . $server_host . ":" . $server_port . "\n";
303 }
304 else
305 {
306 print "Failed!\n";
307 exit;
308 }
309 # - use the client tool to add ourselves as a listener
310 print " * Registering as listener... ";
311 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
312 &shellCommand($client_command);
313 print "Done!\n";
314}
315elsif ($infodbtype =~ /stdoutxml/)
316{
317 print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
318}
319else
320{
321 print "Error! True Hadoop processing is only available when Greenstone is\n";
322 print " configured to use either GDBMServer or TDBServer.\n";
323 exit;
324}
325
326# 3.5 Start up the thrift server(s) if we've been asked to
327my $thrift_log = $gs_results_dir . '/thriftctl.log';
328if ($start_thrift)
329{
330 if ($is_rocks_cluster)
331 {
332 print " * Starting Thrift Servers (on compute nodes)... ";
333 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
334 }
335 # single server
336 else
337 {
338 print " * Starting Thrift Server... ";
339 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
340 }
341 print "Done!\n";
342}
343
344my $actual_archives_dir;
345if ($use_nfs)
346{
347 $actual_archives_dir = $nfs_output_dir;
348}
349else
350{
351 $actual_archives_dir = $hdfs_output_dir;
352 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
353}
354
355# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
356# and allow the FileInputFormat to split it up into files to be processed
357# in Greenstone. This works for collections with one file per document, like
358# Lorem and ReplayMe, but might not work well with multiple file documents
359# such as the Demo collection
360print " * Running import using Hadoop...";
361my $hadoop_log = $gs_results_dir . '/hadoop.log';
362&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
363my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
364$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
365$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
366$hadoop_command .= $collection . ' '; # The collection name
367$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
368$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
369$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
370$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
371$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
372&shellCommand($hadoop_command);
373print "Done!\n";
374
375# 5. If we ran *Server infodbs, we now need to shut them down
376if ($server_prefix ne '')
377{
378 print " * Deregistering as listener and shutting down... ";
379 # - deregister as a listener
380 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
381 &shellCommand($client_command1);
382 # - send quit command
383 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
384 &shellCommand($client_command2);
385 print "Done!\n";
386}
387
388# 5.5 We started them - so we better stop the thrift servers too
389# 3.5 Start up the thrift server(s) if we've been asked to
390if ($start_thrift)
391{
392 if ($is_rocks_cluster)
393 {
394 print " * Stopping Thrift Servers (on compute nodes)... ";
395 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
396 }
397 # single server
398 else
399 {
400 print " * Stoping Thrift Server... ";
401 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
402 }
403 print "Done!\n";
404}
405
406# 6. Gather logs
407print " * Gathering logs from compute nodes... ";
408# - local files
409if (!&dirIsEmpty('/tmp/greenstone'))
410{
411 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
412}
413if (-d $gs_collection_dir . '/logs' && !&dirIsEmpty($gs_collection_dir . '/logs'))
414{
415 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
416}
417if ($start_thrift && -d '/tmp/thrift')
418{
419 &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
420}
421# - remote files
422if ($is_rocks_cluster)
423{
424 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
425 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
426 if ($start_thrift)
427 {
428 &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
429 }
430}
431print "Done!\n";
432
433# - generate data locality report...
434if (!$use_nfs && !$use_thrift)
435{
436 &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
437}
438
439# - hadoop report...
440&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
441
442# - and gantt chart
443&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
444
445# 7. Done - clean up
446print " * Cleaning up temporary files... ";
447&shellCommand('rm -rf /tmp/greenstone');
448&shellCommand('rm -rf /tmp/gsimport*');
449if ($is_rocks_cluster)
450{
451 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
452 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
453}
454print "Done!\n";
455print "Complete!\n\n";
456
457exit;
458
459# /** @function debugPrint
460# */
461sub debugPrint
462{
463 my $msg = shift(@_);
464 if ($debug)
465 {
466 print "[Debug] " . $msg . "\n";
467 }
468}
469# /** debugPrint() **/
470
471# /** @function hdfsCommand
472# */
473sub hdfsCommand
474{
475 my $command = shift(@_);
476 my $paths = '"' . join('" "', @_) . '"';
477 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
478 &shellCommand($hdfs_command);
479 return $?;
480}
481# /** hdfsCommand() **/
482
483# /** @function hdfsTest
484# */
485sub hdfsTest
486{
487 my $command = shift(@_);
488 my $test_target = shift(@_);
489 my $result = &hdfsCommand('test -' . $command, @_);
490 return ($result == $test_target);
491}
492# /** hdfsTest() **/
493
494# /**
495# */
496sub printUsage
497{
498 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
499 exit;
500}
501# /** printUsage() **/
502
503
504## @function recursiveCopy()
505#
506sub recursiveCopy
507{
508 my ($src_dir, $hdfs_dir) = @_;
509 my $file_count = 0;
510 # - create the directory in HDFS
511 if ($use_nfs)
512 {
513 &shellCommand('mkdir "' . $hdfs_dir . '"');
514 }
515 else
516 {
517 &hdfsCommand('mkdir', $hdfs_dir);
518 }
519 # - search $src_dir for files
520 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
521 my @files = readdir(DH);
522 closedir(DH);
523 foreach my $file (@files)
524 {
525 # - skip dot prefix files
526 if ($file !~ /^\./)
527 {
528 my $src_path = $src_dir . '/' . $file;
529 # - recurse directories, remembering to extend HDFS dir too
530 if (-d $src_path)
531 {
532 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
533 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
534 }
535 # - and use 'put' to copy files
536 else
537 {
538 my $hdfs_path = $hdfs_dir . '/' . $file;
539 if ($use_nfs)
540 {
541 &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
542 }
543 else
544 {
545 &hdfsCommand('put', $src_path, $hdfs_path);
546 }
547 $file_count++;
548 }
549 }
550 }
551 return $file_count;
552}
553## recursiveCopy() ##
554
555
556# /** @function shellCommand
557# */
558sub shellCommand
559{
560 my $cmd = shift(@_);
561 my $output = '';
562 &debugPrint($cmd);
563 if (!$dry_run)
564 {
565 $output = `$cmd`;
566 }
567 return $output;
568}
569# /** shellCommand() **/
570
571# /** @function urlCat
572# */
573sub urlCat
574{
575 my $url = join('/', @_);
576 return $url;
577}
578# /** urlCat() **/
579
580# /**
581# */
582sub dirIsEmpty
583{
584 my $dir = shift(@_);
585 my @files;
586 if (-e $dir)
587 {
588 opendir(DIR, $dir) or die $!;
589 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
590 closedir(DIR);
591 }
592 @files ? 0 : 1;
593}
594# /** dirIsEmpty() **/
595
596
597## @function recursiveDelete()
598#
599sub recursiveDelete
600{
601 my ($dir, $prefix) = @_;
602 if ($dir =~ /^$prefix/)
603 {
604 &shellCommand('rm -rf "' . $dir . '"');
605 }
606}
607## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.