source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 28015

Last change on this file since 28015 was 28015, checked in by jmt12, 11 years ago

Add an extra option that allows me to pass in the directory to write log files to, and add extra check when copying logs (by *.*) that the log directory isn't empty

  • Property svn:executable set to *
File size: 15.9 KB
RevLine 
[26949]1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
[27644]8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
[26949]9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
[27594]10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
[27001]12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
[26949]14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
[28015]20my $use_thrift = 0;
[27644]21my $start_thrift = 0;
[26949]22my $debug = 0;
23my $dry_run = 0;
[27654]24my $stagger = 0;
[27644]25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
[26949]28my $gsdl_home = $ENV{'GSDLHOME'};
[27001]29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
[26949]30my $hadoop_exe = 'hadoop'; # you may add path
[27913]31my $java_library = 'HadoopGreenstoneIngest2';
[27001]32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
[27414]33my $hdfs_fs_prefix = 'HDThriftFS://';
[26949]34my $refresh_import = 0;
35my $username = `whoami`;
36chomp($username);
[28015]37my $gs_results_dir = '';
[26949]38
[27001]39`rocks > /dev/null 2>&1`;
40my $is_rocks_cluster = ($? == 0);
41
[26949]42# 1. Read and validate parameters
[27686]43print 'Options: ' . join(' ', @ARGV) . "\n";
[27644]44if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
[26949]45{
46 $collection = $ARGV[0];
[27686]47 print ' collection: ' . $collection . "\n";
[26949]48}
[27414]49else
50{
[27654]51 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
52 print STDERR "where: [debug] print more debug messages to STDERR\n";
53 print STDERR " [dry_run] don't actually perform an file actions\n";
[27644]54 exit;
[27414]55}
56my $offset = 1;
57while (defined $ARGV[$offset])
58{
59 if ($ARGV[$offset] eq '-debug')
60 {
61 $debug = 1;
62 }
63 if ($ARGV[$offset] eq '-disable_thrift')
64 {
65 $use_thrift = 0;
66 }
67 if ($ARGV[$offset] eq '-dry_run')
68 {
69 $dry_run = 1;
70 }
71 if ($ARGV[$offset] eq '-refresh_import')
72 {
73 $refresh_import = 1;
74 }
[27654]75 if ($ARGV[$offset] eq '-stagger')
76 {
77 $stagger = 1;
78 }
[27644]79 if ($ARGV[$offset] eq '-flush_diskcache')
80 {
81 $flush_diskcache = 1;
82 }
83 if ($ARGV[$offset] eq '-start_thrift')
84 {
85 $start_thrift = 1;
86 }
87 if ($ARGV[$offset] eq '-use_nfs')
88 {
89 $use_nfs = 1;
90 }
[28015]91 if ($ARGV[$offset] eq '-logdir')
92 {
93 $offset++;
94 $gs_results_dir = $ARGV[$offset];
95 }
[27414]96 $offset++;
97}
98
99if (!$use_thrift)
100{
101 $hdfs_fs_prefix = 'HDFSShell://';
102}
[27644]103if ($use_nfs)
104{
105 $hdfs_fs_prefix = '/hdfs';
106}
[27414]107
[26949]108my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
109my $gs_import_dir = $gs_collection_dir . '/import';
110if (!-d $gs_import_dir)
111{
112 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
113}
[28015]114if ($gs_results_dir eq '')
[26949]115{
[28015]116 $gs_results_dir = $gs_collection_dir . '/results';
117 if (!-d $gs_results_dir)
118 {
119 mkdir($gs_results_dir, 0755);
120 }
121 $gs_results_dir .= '/' . time();
[26949]122}
123if (!-d $gs_results_dir)
124{
125 mkdir($gs_results_dir, 0755);
126}
127# - directories within HDFS
[27644]128my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
129print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
130my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
131if ($use_nfs)
132{
133 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
134}
135my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
136print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
137my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
138if ($use_nfs)
139{
140 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
141}
[26949]142
143# 2. Copy the import directory into HDFS
144print " * Replicating import directory in HDFS...";
145# - check if import directory already exists
[27644]146my $hdfs_import_exists = 0;
147if ($use_nfs)
148{
149 if (-d $nfs_input_dir)
150 {
151 $hdfs_import_exists = 1;
152 }
153}
154else
155{
156 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
157}
[26949]158if ($refresh_import || !$hdfs_import_exists)
159{
160 # - clear out the old import directory
161 if ($hdfs_import_exists)
162 {
[27644]163 if ($use_nfs)
164 {
165 &recursiveDelete($nfs_input_dir, '/hdfs');
166 }
167 else
168 {
169 &hdfsCommand('rmr', $hdfs_input_dir);
170 }
[26949]171 }
172 # - now recursively copy the contents of import directory into HDFS ensuring
173 # that relative paths are maintained
[27644]174 my $file_count = 0;
175 if ($use_nfs)
176 {
177 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
178 }
179 else
180 {
181 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
182 }
[26949]183 &debugPrint($file_count . " files 'putted'");
184 print "Done!\n";
185}
186else
187{
188 print "Already exists!\n";
189}
[27644]190
[26949]191# - clear out the archives regardless
[27530]192my $gs_archives_dir = $gs_collection_dir . '/archives';
193my $deleted_archives = 0;
[27001]194if (-e $gs_archives_dir)
195{
[27530]196 print " * Clearing existing archives directory for this collection... ";
[27644]197 &recursiveDelete($gs_archives_dir, $gsdl_home);
[27530]198 $deleted_archives = 1;
[27001]199}
200mkdir($gs_archives_dir, 0755);
[27644]201my $hdfs_archives_exists = 0;
202if ($use_nfs)
[26949]203{
[27644]204 if (-d $nfs_output_dir)
205 {
206 $hdfs_archives_exists = 1;
207 }
208}
209else
210{
211 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
212}
213if ($hdfs_archives_exists)
214{
[27530]215 if (!$deleted_archives)
216 {
217 print " * Clearing existing archives directory for this collection... ";
218 }
[27644]219 if ($use_nfs)
220 {
221 &recursiveDelete($nfs_output_dir, '/hdfs');
222 }
223 else
224 {
225 &hdfsCommand('rmr', $hdfs_output_dir);
226 }
[27530]227 $deleted_archives = 1;
[26949]228}
[27530]229if ($deleted_archives)
230{
231 print "Done!\n";
232}
[27644]233
[27530]234# - watch for cached directories for Media based collections
235my $gs_cached_dir = $gs_collection_dir . '/cached';
236if (-e $gs_cached_dir)
237{
238 print " * Clearing existing cached media directory for this collection... ";
[27644]239 &recursiveDelete($gs_cached_dir, $gsdl_home);
[27530]240 print "Done!\n";
241}
242
[27001]243# - clear out any old logs
[27530]244print " * Clearing existing logs for this collection... ";
245my $gs_logs_dir = $gs_collection_dir . '/logs';
246if (!&dirIsEmpty($gs_logs_dir))
247{
[27644]248 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
[27530]249}
[27001]250if (!&dirIsEmpty('/tmp/greenstone'))
251{
[27584]252 &shellCommand('rm -f /tmp/greenstone/*.*');
253 &shellCommand('rm -rf /tmp/gsimport*');
[27644]254 &shellCommand('rm -rf /tmp/thrift');
[27001]255}
256if ($is_rocks_cluster)
257{
[27584]258 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
259 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
[27644]260 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
[27001]261}
[27126]262print "Done!\n";
[26949]263
[27126]264# - flush DNS cache too, so we are playing on a level field
[27644]265if ($flush_diskcache)
[27414]266{
[27644]267 print " * Flushing disk cache... ";
268 &shellCommand('flush_caches.pl');
269 if ($is_rocks_cluster)
270 {
271 &shellCommand('rocks run host "flush_caches.pl"');
272 }
273 print "Done!\n";
[27414]274}
[27126]275
[27654]276# - If we've been asked to Stagger start-up, add "delay.me" files to the
277# compute nodes
278if ($is_rocks_cluster && $stagger)
279{
280 &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
281}
282
[26949]283# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
284# where we start the server now to ensure it lives on the head node
[27001]285my $server_host = '';
286my $server_port = '';
[26949]287my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
288my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
289my $server_prefix = '';
290if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
291{
292 $server_prefix = uc($1);
293 print " * Starting " . $server_prefix . "Server... ";
294 # - start the server on the head node and retrieve the host and port from
295 # the output
296 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
297 my $launcher_output = &shellCommand($launcher_command);
298 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
299 {
300 $server_host = $1;
301 $server_port = $2;
302 print "running on " . $server_host . ":" . $server_port . "\n";
303 }
304 else
305 {
306 print "Failed!\n";
307 exit;
308 }
[27001]309 # - use the client tool to add ourselves as a listener
[26949]310 print " * Registering as listener... ";
311 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
312 &shellCommand($client_command);
313 print "Done!\n";
314}
[27913]315elsif ($infodbtype =~ /stdoutxml/)
316{
317 print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
318}
[26949]319else
320{
321 print "Error! True Hadoop processing is only available when Greenstone is\n";
322 print " configured to use either GDBMServer or TDBServer.\n";
323 exit;
324}
325
[27594]326# 3.5 Start up the thrift server(s) if we've been asked to
[27654]327my $thrift_log = $gs_results_dir . '/thriftctl.log';
[27594]328if ($start_thrift)
329{
330 if ($is_rocks_cluster)
331 {
332 print " * Starting Thrift Servers (on compute nodes)... ";
[27654]333 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
[27594]334 }
335 # single server
336 else
337 {
338 print " * Starting Thrift Server... ";
[27654]339 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
[27594]340 }
341 print "Done!\n";
342}
343
[27644]344my $actual_archives_dir;
345if ($use_nfs)
346{
347 $actual_archives_dir = $nfs_output_dir;
348}
349else
350{
351 $actual_archives_dir = $hdfs_output_dir;
352 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
353}
354
[26949]355# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
356# and allow the FileInputFormat to split it up into files to be processed
357# in Greenstone. This works for collections with one file per document, like
358# Lorem and ReplayMe, but might not work well with multiple file documents
359# such as the Demo collection
360print " * Running import using Hadoop...";
361my $hadoop_log = $gs_results_dir . '/hadoop.log';
[27644]362&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
[27913]363my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
[27644]364$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
365$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
366$hadoop_command .= $collection . ' '; # The collection name
367$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
368$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
369$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
370$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
371$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
[26949]372&shellCommand($hadoop_command);
373print "Done!\n";
374
375# 5. If we ran *Server infodbs, we now need to shut them down
376if ($server_prefix ne '')
377{
378 print " * Deregistering as listener and shutting down... ";
379 # - deregister as a listener
380 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
381 &shellCommand($client_command1);
382 # - send quit command
383 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
384 &shellCommand($client_command2);
385 print "Done!\n";
386}
387
[27594]388# 5.5 We started them - so we better stop the thrift servers too
389# 3.5 Start up the thrift server(s) if we've been asked to
390if ($start_thrift)
391{
392 if ($is_rocks_cluster)
393 {
394 print " * Stopping Thrift Servers (on compute nodes)... ";
[27654]395 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
[27594]396 }
397 # single server
398 else
399 {
400 print " * Stoping Thrift Server... ";
[27654]401 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
[27594]402 }
403 print "Done!\n";
404}
[26949]405
406# 6. Gather logs
407print " * Gathering logs from compute nodes... ";
408# - local files
[27001]409if (!&dirIsEmpty('/tmp/greenstone'))
410{
411 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
412}
[28015]413if (-d $gs_collection_dir . '/logs' && !&dirIsEmpty($gs_collection_dir . '/logs'))
[26949]414{
415 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
416}
[27654]417if ($start_thrift && -d '/tmp/thrift')
418{
419 &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
420}
[26949]421# - remote files
[27001]422if ($is_rocks_cluster)
[26949]423{
[27001]424 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
[27654]425 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
426 if ($start_thrift)
427 {
428 &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
429 }
[26949]430}
431print "Done!\n";
[27654]432
433# - generate data locality report...
[27732]434if (!$use_nfs && !$use_thrift)
435{
436 &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
437}
[26949]438
[27654]439# - hadoop report...
440&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
441
442# - and gantt chart
443&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
444
[26949]445# 7. Done - clean up
446print " * Cleaning up temporary files... ";
447&shellCommand('rm -rf /tmp/greenstone');
[27126]448&shellCommand('rm -rf /tmp/gsimport*');
[27001]449if ($is_rocks_cluster)
[26949]450{
451 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
[27126]452 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
[26949]453}
454print "Done!\n";
455print "Complete!\n\n";
456
457exit;
458
459# /** @function debugPrint
460# */
461sub debugPrint
462{
463 my $msg = shift(@_);
464 if ($debug)
465 {
466 print "[Debug] " . $msg . "\n";
467 }
468}
469# /** debugPrint() **/
470
471# /** @function hdfsCommand
472# */
473sub hdfsCommand
474{
475 my $command = shift(@_);
476 my $paths = '"' . join('" "', @_) . '"';
477 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
478 &shellCommand($hdfs_command);
479 return $?;
480}
481# /** hdfsCommand() **/
482
483# /** @function hdfsTest
484# */
485sub hdfsTest
486{
487 my $command = shift(@_);
488 my $test_target = shift(@_);
489 my $result = &hdfsCommand('test -' . $command, @_);
490 return ($result == $test_target);
491}
492# /** hdfsTest() **/
493
494# /**
495# */
496sub printUsage
497{
498 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
499 exit;
500}
501# /** printUsage() **/
502
[27644]503
504## @function recursiveCopy()
505#
[26949]506sub recursiveCopy
507{
508 my ($src_dir, $hdfs_dir) = @_;
509 my $file_count = 0;
510 # - create the directory in HDFS
[27644]511 if ($use_nfs)
512 {
513 &shellCommand('mkdir "' . $hdfs_dir . '"');
514 }
515 else
516 {
517 &hdfsCommand('mkdir', $hdfs_dir);
518 }
[26949]519 # - search $src_dir for files
520 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
521 my @files = readdir(DH);
522 closedir(DH);
523 foreach my $file (@files)
524 {
525 # - skip dot prefix files
526 if ($file !~ /^\./)
527 {
528 my $src_path = $src_dir . '/' . $file;
529 # - recurse directories, remembering to extend HDFS dir too
530 if (-d $src_path)
531 {
532 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
533 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
534 }
535 # - and use 'put' to copy files
536 else
537 {
538 my $hdfs_path = $hdfs_dir . '/' . $file;
[27644]539 if ($use_nfs)
540 {
[27732]541 &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
[27644]542 }
543 else
544 {
545 &hdfsCommand('put', $src_path, $hdfs_path);
546 }
[26949]547 $file_count++;
548 }
549 }
550 }
551 return $file_count;
552}
[27644]553## recursiveCopy() ##
[26949]554
[27644]555
[26949]556# /** @function shellCommand
557# */
558sub shellCommand
559{
560 my $cmd = shift(@_);
561 my $output = '';
562 &debugPrint($cmd);
563 if (!$dry_run)
564 {
565 $output = `$cmd`;
566 }
567 return $output;
568}
569# /** shellCommand() **/
570
571# /** @function urlCat
572# */
573sub urlCat
574{
575 my $url = join('/', @_);
576 return $url;
577}
578# /** urlCat() **/
[27001]579
580# /**
581# */
582sub dirIsEmpty
583{
584 my $dir = shift(@_);
585 my @files;
586 if (-e $dir)
587 {
588 opendir(DIR, $dir) or die $!;
589 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
590 closedir(DIR);
591 }
592 @files ? 0 : 1;
593}
594# /** dirIsEmpty() **/
[27644]595
596
597## @function recursiveDelete()
598#
599sub recursiveDelete
600{
601 my ($dir, $prefix) = @_;
602 if ($dir =~ /^$prefix/)
603 {
604 &shellCommand('rm -rf "' . $dir . '"');
605 }
606}
607## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.