source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 28015

Last change on this file since 28015 was 28015, checked in by jmt12, 8 years ago

Add an extra option that allows me to pass in the directory to write log files to, and add extra check when copying logs (by *.*) that the log directory isn't empty

  • Property svn:executable set to *
File size: 15.9 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 0;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $java_library = 'HadoopGreenstoneIngest2';
32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
33my $hdfs_fs_prefix = 'HDThriftFS://';
34my $refresh_import = 0;
35my $username = `whoami`;
36chomp($username);
37my $gs_results_dir = '';
38
39`rocks > /dev/null 2>&1`;
40my $is_rocks_cluster = ($? == 0);
41
42# 1. Read and validate parameters
43print 'Options: ' . join(' ', @ARGV) . "\n";
44if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
45{
46 $collection = $ARGV[0];
47 print ' collection: ' . $collection . "\n";
48}
49else
50{
51 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
52 print STDERR "where: [debug] print more debug messages to STDERR\n";
53 print STDERR " [dry_run] don't actually perform an file actions\n";
54 exit;
55}
56my $offset = 1;
57while (defined $ARGV[$offset])
58{
59 if ($ARGV[$offset] eq '-debug')
60 {
61 $debug = 1;
62 }
63 if ($ARGV[$offset] eq '-disable_thrift')
64 {
65 $use_thrift = 0;
66 }
67 if ($ARGV[$offset] eq '-dry_run')
68 {
69 $dry_run = 1;
70 }
71 if ($ARGV[$offset] eq '-refresh_import')
72 {
73 $refresh_import = 1;
74 }
75 if ($ARGV[$offset] eq '-stagger')
76 {
77 $stagger = 1;
78 }
79 if ($ARGV[$offset] eq '-flush_diskcache')
80 {
81 $flush_diskcache = 1;
82 }
83 if ($ARGV[$offset] eq '-start_thrift')
84 {
85 $start_thrift = 1;
86 }
87 if ($ARGV[$offset] eq '-use_nfs')
88 {
89 $use_nfs = 1;
90 }
91 if ($ARGV[$offset] eq '-logdir')
92 {
93 $offset++;
94 $gs_results_dir = $ARGV[$offset];
95 }
96 $offset++;
97}
98
99if (!$use_thrift)
100{
101 $hdfs_fs_prefix = 'HDFSShell://';
102}
103if ($use_nfs)
104{
105 $hdfs_fs_prefix = '/hdfs';
106}
107
108my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
109my $gs_import_dir = $gs_collection_dir . '/import';
110if (!-d $gs_import_dir)
111{
112 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
113}
114if ($gs_results_dir eq '')
115{
116 $gs_results_dir = $gs_collection_dir . '/results';
117 if (!-d $gs_results_dir)
118 {
119 mkdir($gs_results_dir, 0755);
120 }
121 $gs_results_dir .= '/' . time();
122}
123if (!-d $gs_results_dir)
124{
125 mkdir($gs_results_dir, 0755);
126}
127# - directories within HDFS
128my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
129print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
130my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
131if ($use_nfs)
132{
133 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
134}
135my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
136print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
137my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
138if ($use_nfs)
139{
140 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
141}
142
143# 2. Copy the import directory into HDFS
144print " * Replicating import directory in HDFS...";
145# - check if import directory already exists
146my $hdfs_import_exists = 0;
147if ($use_nfs)
148{
149 if (-d $nfs_input_dir)
150 {
151 $hdfs_import_exists = 1;
152 }
153}
154else
155{
156 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
157}
158if ($refresh_import || !$hdfs_import_exists)
159{
160 # - clear out the old import directory
161 if ($hdfs_import_exists)
162 {
163 if ($use_nfs)
164 {
165 &recursiveDelete($nfs_input_dir, '/hdfs');
166 }
167 else
168 {
169 &hdfsCommand('rmr', $hdfs_input_dir);
170 }
171 }
172 # - now recursively copy the contents of import directory into HDFS ensuring
173 # that relative paths are maintained
174 my $file_count = 0;
175 if ($use_nfs)
176 {
177 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
178 }
179 else
180 {
181 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
182 }
183 &debugPrint($file_count . " files 'putted'");
184 print "Done!\n";
185}
186else
187{
188 print "Already exists!\n";
189}
190
191# - clear out the archives regardless
192my $gs_archives_dir = $gs_collection_dir . '/archives';
193my $deleted_archives = 0;
194if (-e $gs_archives_dir)
195{
196 print " * Clearing existing archives directory for this collection... ";
197 &recursiveDelete($gs_archives_dir, $gsdl_home);
198 $deleted_archives = 1;
199}
200mkdir($gs_archives_dir, 0755);
201my $hdfs_archives_exists = 0;
202if ($use_nfs)
203{
204 if (-d $nfs_output_dir)
205 {
206 $hdfs_archives_exists = 1;
207 }
208}
209else
210{
211 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
212}
213if ($hdfs_archives_exists)
214{
215 if (!$deleted_archives)
216 {
217 print " * Clearing existing archives directory for this collection... ";
218 }
219 if ($use_nfs)
220 {
221 &recursiveDelete($nfs_output_dir, '/hdfs');
222 }
223 else
224 {
225 &hdfsCommand('rmr', $hdfs_output_dir);
226 }
227 $deleted_archives = 1;
228}
229if ($deleted_archives)
230{
231 print "Done!\n";
232}
233
234# - watch for cached directories for Media based collections
235my $gs_cached_dir = $gs_collection_dir . '/cached';
236if (-e $gs_cached_dir)
237{
238 print " * Clearing existing cached media directory for this collection... ";
239 &recursiveDelete($gs_cached_dir, $gsdl_home);
240 print "Done!\n";
241}
242
243# - clear out any old logs
244print " * Clearing existing logs for this collection... ";
245my $gs_logs_dir = $gs_collection_dir . '/logs';
246if (!&dirIsEmpty($gs_logs_dir))
247{
248 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
249}
250if (!&dirIsEmpty('/tmp/greenstone'))
251{
252 &shellCommand('rm -f /tmp/greenstone/*.*');
253 &shellCommand('rm -rf /tmp/gsimport*');
254 &shellCommand('rm -rf /tmp/thrift');
255}
256if ($is_rocks_cluster)
257{
258 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
259 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
260 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
261}
262print "Done!\n";
263
264# - flush DNS cache too, so we are playing on a level field
265if ($flush_diskcache)
266{
267 print " * Flushing disk cache... ";
268 &shellCommand('flush_caches.pl');
269 if ($is_rocks_cluster)
270 {
271 &shellCommand('rocks run host "flush_caches.pl"');
272 }
273 print "Done!\n";
274}
275
276# - If we've been asked to Stagger start-up, add "delay.me" files to the
277# compute nodes
278if ($is_rocks_cluster && $stagger)
279{
280 &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
281}
282
283# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
284# where we start the server now to ensure it lives on the head node
285my $server_host = '';
286my $server_port = '';
287my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
288my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
289my $server_prefix = '';
290if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
291{
292 $server_prefix = uc($1);
293 print " * Starting " . $server_prefix . "Server... ";
294 # - start the server on the head node and retrieve the host and port from
295 # the output
296 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
297 my $launcher_output = &shellCommand($launcher_command);
298 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
299 {
300 $server_host = $1;
301 $server_port = $2;
302 print "running on " . $server_host . ":" . $server_port . "\n";
303 }
304 else
305 {
306 print "Failed!\n";
307 exit;
308 }
309 # - use the client tool to add ourselves as a listener
310 print " * Registering as listener... ";
311 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
312 &shellCommand($client_command);
313 print "Done!\n";
314}
315elsif ($infodbtype =~ /stdoutxml/)
316{
317 print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
318}
319else
320{
321 print "Error! True Hadoop processing is only available when Greenstone is\n";
322 print " configured to use either GDBMServer or TDBServer.\n";
323 exit;
324}
325
326# 3.5 Start up the thrift server(s) if we've been asked to
327my $thrift_log = $gs_results_dir . '/thriftctl.log';
328if ($start_thrift)
329{
330 if ($is_rocks_cluster)
331 {
332 print " * Starting Thrift Servers (on compute nodes)... ";
333 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
334 }
335 # single server
336 else
337 {
338 print " * Starting Thrift Server... ";
339 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
340 }
341 print "Done!\n";
342}
343
344my $actual_archives_dir;
345if ($use_nfs)
346{
347 $actual_archives_dir = $nfs_output_dir;
348}
349else
350{
351 $actual_archives_dir = $hdfs_output_dir;
352 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
353}
354
355# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
356# and allow the FileInputFormat to split it up into files to be processed
357# in Greenstone. This works for collections with one file per document, like
358# Lorem and ReplayMe, but might not work well with multiple file documents
359# such as the Demo collection
360print " * Running import using Hadoop...";
361my $hadoop_log = $gs_results_dir . '/hadoop.log';
362&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
363my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
364$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
365$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
366$hadoop_command .= $collection . ' '; # The collection name
367$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
368$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
369$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
370$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
371$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
372&shellCommand($hadoop_command);
373print "Done!\n";
374
375# 5. If we ran *Server infodbs, we now need to shut them down
376if ($server_prefix ne '')
377{
378 print " * Deregistering as listener and shutting down... ";
379 # - deregister as a listener
380 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
381 &shellCommand($client_command1);
382 # - send quit command
383 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
384 &shellCommand($client_command2);
385 print "Done!\n";
386}
387
388# 5.5 We started them - so we better stop the thrift servers too
389# 3.5 Start up the thrift server(s) if we've been asked to
390if ($start_thrift)
391{
392 if ($is_rocks_cluster)
393 {
394 print " * Stopping Thrift Servers (on compute nodes)... ";
395 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
396 }
397 # single server
398 else
399 {
400 print " * Stoping Thrift Server... ";
401 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
402 }
403 print "Done!\n";
404}
405
406# 6. Gather logs
407print " * Gathering logs from compute nodes... ";
408# - local files
409if (!&dirIsEmpty('/tmp/greenstone'))
410{
411 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
412}
413if (-d $gs_collection_dir . '/logs' && !&dirIsEmpty($gs_collection_dir . '/logs'))
414{
415 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
416}
417if ($start_thrift && -d '/tmp/thrift')
418{
419 &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
420}
421# - remote files
422if ($is_rocks_cluster)
423{
424 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
425 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
426 if ($start_thrift)
427 {
428 &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
429 }
430}
431print "Done!\n";
432
433# - generate data locality report...
434if (!$use_nfs && !$use_thrift)
435{
436 &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
437}
438
439# - hadoop report...
440&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
441
442# - and gantt chart
443&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
444
445# 7. Done - clean up
446print " * Cleaning up temporary files... ";
447&shellCommand('rm -rf /tmp/greenstone');
448&shellCommand('rm -rf /tmp/gsimport*');
449if ($is_rocks_cluster)
450{
451 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
452 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
453}
454print "Done!\n";
455print "Complete!\n\n";
456
457exit;
458
459# /** @function debugPrint
460# */
461sub debugPrint
462{
463 my $msg = shift(@_);
464 if ($debug)
465 {
466 print "[Debug] " . $msg . "\n";
467 }
468}
469# /** debugPrint() **/
470
471# /** @function hdfsCommand
472# */
473sub hdfsCommand
474{
475 my $command = shift(@_);
476 my $paths = '"' . join('" "', @_) . '"';
477 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
478 &shellCommand($hdfs_command);
479 return $?;
480}
481# /** hdfsCommand() **/
482
483# /** @function hdfsTest
484# */
485sub hdfsTest
486{
487 my $command = shift(@_);
488 my $test_target = shift(@_);
489 my $result = &hdfsCommand('test -' . $command, @_);
490 return ($result == $test_target);
491}
492# /** hdfsTest() **/
493
494# /**
495# */
496sub printUsage
497{
498 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
499 exit;
500}
501# /** printUsage() **/
502
503
504## @function recursiveCopy()
505#
506sub recursiveCopy
507{
508 my ($src_dir, $hdfs_dir) = @_;
509 my $file_count = 0;
510 # - create the directory in HDFS
511 if ($use_nfs)
512 {
513 &shellCommand('mkdir "' . $hdfs_dir . '"');
514 }
515 else
516 {
517 &hdfsCommand('mkdir', $hdfs_dir);
518 }
519 # - search $src_dir for files
520 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
521 my @files = readdir(DH);
522 closedir(DH);
523 foreach my $file (@files)
524 {
525 # - skip dot prefix files
526 if ($file !~ /^\./)
527 {
528 my $src_path = $src_dir . '/' . $file;
529 # - recurse directories, remembering to extend HDFS dir too
530 if (-d $src_path)
531 {
532 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
533 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
534 }
535 # - and use 'put' to copy files
536 else
537 {
538 my $hdfs_path = $hdfs_dir . '/' . $file;
539 if ($use_nfs)
540 {
541 &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
542 }
543 else
544 {
545 &hdfsCommand('put', $src_path, $hdfs_path);
546 }
547 $file_count++;
548 }
549 }
550 }
551 return $file_count;
552}
553## recursiveCopy() ##
554
555
556# /** @function shellCommand
557# */
558sub shellCommand
559{
560 my $cmd = shift(@_);
561 my $output = '';
562 &debugPrint($cmd);
563 if (!$dry_run)
564 {
565 $output = `$cmd`;
566 }
567 return $output;
568}
569# /** shellCommand() **/
570
571# /** @function urlCat
572# */
573sub urlCat
574{
575 my $url = join('/', @_);
576 return $url;
577}
578# /** urlCat() **/
579
580# /**
581# */
582sub dirIsEmpty
583{
584 my $dir = shift(@_);
585 my @files;
586 if (-e $dir)
587 {
588 opendir(DIR, $dir) or die $!;
589 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
590 closedir(DIR);
591 }
592 @files ? 0 : 1;
593}
594# /** dirIsEmpty() **/
595
596
597## @function recursiveDelete()
598#
599sub recursiveDelete
600{
601 my ($dir, $prefix) = @_;
602 if ($dir =~ /^$prefix/)
603 {
604 &shellCommand('rm -rf "' . $dir . '"');
605 }
606}
607## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.