source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 30354

Last change on this file since 30354 was 30354, checked in by jmt12, 8 years ago

Extending manifest v2 support to allow for directories to be listed in manifest. Matched with changes in Directory plugin to allow paths into systems like HDFS to be listed in manifest.cd

  • Property svn:executable set to *
File size: 16.0 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 0;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $java_library = 'HadoopGreenstoneIngest2';
32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
33my $hdfs_fs_prefix = 'HDThriftFS://';
34my $refresh_import = 0;
35my $remove_old = 0;
36my $username = `whoami`;
37chomp($username);
38my $gs_results_dir = '';
39
40`rocks > /dev/null 2>&1`;
41my $is_rocks_cluster = ($? == 0);
42
43# 1. Read and validate parameters
44print 'Options: ' . join(' ', @ARGV) . "\n";
45if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
46{
47 $collection = $ARGV[0];
48 print ' collection: ' . $collection . "\n";
49}
50else
51{
52 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-enable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger] [-removeold]\n";
53 print STDERR "where: [debug] print more debug messages to STDERR\n";
54 print STDERR " [dry_run] don't actually perform an file actions\n";
55 exit;
56}
57my $offset = 1;
58while (defined $ARGV[$offset])
59{
60 if ($ARGV[$offset] eq '-debug')
61 {
62 $debug = 1;
63 }
64 if ($ARGV[$offset] eq '-enable_thrift')
65 {
66 $use_thrift = 1;
67 }
68 if ($ARGV[$offset] eq '-dry_run')
69 {
70 $dry_run = 1;
71 }
72 if ($ARGV[$offset] eq '-refresh_import')
73 {
74 $refresh_import = 1;
75 }
76 if ($ARGV[$offset] eq '-stagger')
77 {
78 $stagger = 1;
79 }
80 if ($ARGV[$offset] eq '-flush_diskcache')
81 {
82 $flush_diskcache = 1;
83 }
84 if ($ARGV[$offset] eq '-start_thrift')
85 {
86 $start_thrift = 1;
87 }
88 if ($ARGV[$offset] eq '-use_nfs')
89 {
90 $use_nfs = 1;
91 }
92 if ($ARGV[$offset] eq '-removeold')
93 {
94 $remove_old = 1;
95 }
96 if ($ARGV[$offset] eq '-logdir')
97 {
98 $offset++;
99 $gs_results_dir = $ARGV[$offset];
100 }
101 $offset++;
102}
103
104if (!$use_thrift)
105{
106 $hdfs_fs_prefix = 'HDFSShell://';
107}
108if ($use_nfs)
109{
110 $hdfs_fs_prefix = '/hdfs';
111}
112
113my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
114my $gs_import_dir = $gs_collection_dir . '/import';
115if (!-d $gs_import_dir)
116{
117 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
118}
119if ($gs_results_dir eq '')
120{
121 $gs_results_dir = $gs_collection_dir . '/results';
122 if (!-d $gs_results_dir)
123 {
124 mkdir($gs_results_dir, 0755);
125 }
126 $gs_results_dir .= '/' . time();
127}
128if (!-d $gs_results_dir)
129{
130 mkdir($gs_results_dir, 0755);
131}
132# - directories within HDFS
133my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
134print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
135my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
136if ($use_nfs)
137{
138 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
139}
140my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
141print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
142my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
143if ($use_nfs)
144{
145 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
146}
147
148# 2. Copy the import directory into HDFS
149print " * Replicating import directory in HDFS...";
150# - check if import directory already exists
151my $hdfs_import_exists = 0;
152if ($use_nfs)
153{
154 if (-d $nfs_input_dir)
155 {
156 $hdfs_import_exists = 1;
157 }
158}
159else
160{
161 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
162}
163if ($refresh_import || !$hdfs_import_exists)
164{
165 # - clear out the old import directory
166 if ($hdfs_import_exists)
167 {
168 if ($use_nfs)
169 {
170 &recursiveDelete($nfs_input_dir, '/hdfs');
171 }
172 else
173 {
174 &hdfsCommand('rmr', $hdfs_input_dir);
175 }
176 }
177 # - now recursively copy the contents of import directory into HDFS ensuring
178 # that relative paths are maintained
179 my $file_count = 0;
180 if ($use_nfs)
181 {
182 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
183 }
184 else
185 {
186 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
187 }
188 &debugPrint($file_count . " files 'putted'");
189 print "Done!\n";
190}
191else
192{
193 print "Already exists!\n";
194}
195
196# - clear out the archives regardless
197my $gs_archives_dir = $gs_collection_dir . '/archives';
198my $deleted_archives = 0;
199if (-e $gs_archives_dir)
200{
201 print " * Clearing existing archives directory for this collection... ";
202 &recursiveDelete($gs_archives_dir, $gsdl_home);
203 $deleted_archives = 1;
204}
205mkdir($gs_archives_dir, 0755);
206my $hdfs_archives_exists = 0;
207if ($use_nfs)
208{
209 if (-d $nfs_output_dir)
210 {
211 $hdfs_archives_exists = 1;
212 }
213}
214else
215{
216 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
217}
218if ($hdfs_archives_exists)
219{
220 if (!$deleted_archives)
221 {
222 print " * Clearing existing archives directory for this collection... ";
223 }
224 if ($use_nfs)
225 {
226 &recursiveDelete($nfs_output_dir, '/hdfs');
227 }
228 else
229 {
230 &hdfsCommand('rmr', $hdfs_output_dir);
231 }
232 $deleted_archives = 1;
233}
234if ($deleted_archives)
235{
236 print "Done!\n";
237}
238
239# - watch for cached directories for Media based collections
240my $gs_cached_dir = $gs_collection_dir . '/cached';
241if (-e $gs_cached_dir)
242{
243 print " * Clearing existing cached media directory for this collection... ";
244 &recursiveDelete($gs_cached_dir, $gsdl_home);
245 print "Done!\n";
246}
247
248# - clear out any old logs
249print " * Clearing existing logs for this collection... ";
250my $gs_logs_dir = $gs_collection_dir . '/logs';
251if (!&dirIsEmpty($gs_logs_dir))
252{
253 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
254}
255if (!&dirIsEmpty('/tmp/greenstone'))
256{
257 &shellCommand('rm -f /tmp/greenstone/*.*');
258 &shellCommand('rm -rf /tmp/gsimport*');
259 &shellCommand('rm -rf /tmp/thrift');
260}
261if ($is_rocks_cluster)
262{
263 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
264 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
265 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
266}
267print "Done!\n";
268
269# - flush DNS cache too, so we are playing on a level field
270if ($flush_diskcache)
271{
272 print " * Flushing disk cache... ";
273 &shellCommand('flush_caches.pl');
274 if ($is_rocks_cluster)
275 {
276 &shellCommand('rocks run host "flush_caches.pl"');
277 }
278 print "Done!\n";
279}
280
281# - If we've been asked to Stagger start-up, add "delay.me" files to the
282# compute nodes
283if ($is_rocks_cluster && $stagger)
284{
285 &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
286}
287
288# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
289# where we start the server now to ensure it lives on the head node
290my $server_host = '';
291my $server_port = '';
292my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
293my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
294my $server_prefix = '';
295if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
296{
297 $server_prefix = uc($1);
298 print " * Starting " . $server_prefix . "Server... ";
299 # - start the server on the head node and retrieve the host and port from
300 # the output
301 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
302 my $launcher_output = &shellCommand($launcher_command);
303 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
304 {
305 $server_host = $1;
306 $server_port = $2;
307 print "running on " . $server_host . ":" . $server_port . "\n";
308 }
309 else
310 {
311 print "Failed!\n";
312 exit;
313 }
314 # - use the client tool to add ourselves as a listener
315 print " * Registering as listener... ";
316 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
317 &shellCommand($client_command);
318 print "Done!\n";
319}
320elsif ($infodbtype =~ /stdoutxml/)
321{
322 print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
323}
324else
325{
326 print "Error! True Hadoop processing is only available when Greenstone is\n";
327 print " configured to use either GDBMServer or TDBServer.\n";
328 exit;
329}
330
331# 3.5 Start up the thrift server(s) if we've been asked to
332my $thrift_log = $gs_results_dir . '/thriftctl.log';
333if ($start_thrift)
334{
335 if ($is_rocks_cluster)
336 {
337 print " * Starting Thrift Servers (on compute nodes)... ";
338 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
339 }
340 # single server
341 else
342 {
343 print " * Starting Thrift Server... ";
344 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
345 }
346 print "Done!\n";
347}
348
349my $actual_archives_dir;
350if ($use_nfs)
351{
352 $actual_archives_dir = $nfs_output_dir;
353}
354else
355{
356 $actual_archives_dir = $hdfs_output_dir;
357 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
358}
359
360# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
361# and allow the FileInputFormat to split it up into files to be processed
362# in Greenstone. This works for collections with one file per document, like
363# Lorem and ReplayMe, but might not work well with multiple file documents
364# such as the Demo collection
365print " * Running import using Hadoop...";
366my $hadoop_log = $gs_results_dir . '/hadoop.log';
367&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
368my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
369$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
370$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
371$hadoop_command .= $collection . ' '; # The collection name
372$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
373$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
374$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
375$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
376$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
377&shellCommand($hadoop_command);
378print "Done!\n";
379
380# 5. If we ran *Server infodbs, we now need to shut them down
381if ($server_prefix ne '')
382{
383 print " * Deregistering as listener and shutting down... ";
384 # - deregister as a listener
385 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
386 &shellCommand($client_command1);
387 # - send quit command
388 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
389 &shellCommand($client_command2);
390 print "Done!\n";
391}
392
393# 5.5 We started them - so we better stop the thrift servers too
394# 3.5 Start up the thrift server(s) if we've been asked to
395if ($start_thrift)
396{
397 if ($is_rocks_cluster)
398 {
399 print " * Stopping Thrift Servers (on compute nodes)... ";
400 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
401 }
402 # single server
403 else
404 {
405 print " * Stoping Thrift Server... ";
406 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
407 }
408 print "Done!\n";
409}
410
411# 6. Gather logs
412print " * Gathering logs from compute nodes... ";
413# - local files
414if (!&dirIsEmpty('/tmp/greenstone'))
415{
416 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
417}
418if (-d $gs_collection_dir . '/logs' && !&dirIsEmpty($gs_collection_dir . '/logs'))
419{
420 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
421}
422if ($start_thrift && -d '/tmp/thrift')
423{
424 &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
425}
426# - remote files
427if ($is_rocks_cluster)
428{
429 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
430 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
431 if ($start_thrift)
432 {
433 &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
434 }
435}
436print "Done!\n";
437
438# - generate data locality report...
439if (!$use_nfs && !$use_thrift)
440{
441 &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
442}
443
444# - hadoop report...
445&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
446
447# - and gantt chart
448&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
449
450# 7. Done - clean up
451print " * Cleaning up temporary files... ";
452&shellCommand('rm -rf /tmp/greenstone');
453&shellCommand('rm -rf /tmp/gsimport*');
454if ($is_rocks_cluster)
455{
456 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
457 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
458}
459print "Done!\n";
460print "Complete!\n\n";
461
462exit;
463
464# /** @function debugPrint
465# */
466sub debugPrint
467{
468 my $msg = shift(@_);
469 if ($debug)
470 {
471 print "[Debug] " . $msg . "\n";
472 }
473}
474# /** debugPrint() **/
475
476# /** @function hdfsCommand
477# */
478sub hdfsCommand
479{
480 my $command = shift(@_);
481 my $paths = '"' . join('" "', @_) . '"';
482 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
483 &shellCommand($hdfs_command);
484 return $?;
485}
486# /** hdfsCommand() **/
487
488# /** @function hdfsTest
489# */
490sub hdfsTest
491{
492 my $command = shift(@_);
493 my $test_target = shift(@_);
494 my $result = &hdfsCommand('test -' . $command, @_);
495 return ($result == $test_target);
496}
497# /** hdfsTest() **/
498
499# /**
500# */
501sub printUsage
502{
503 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
504 exit;
505}
506# /** printUsage() **/
507
508
509## @function recursiveCopy()
510#
511sub recursiveCopy
512{
513 my ($src_dir, $hdfs_dir) = @_;
514 my $file_count = 0;
515 # - create the directory in HDFS
516 if ($use_nfs)
517 {
518 &shellCommand('mkdir "' . $hdfs_dir . '"');
519 }
520 else
521 {
522 &hdfsCommand('mkdir', $hdfs_dir);
523 }
524 # - search $src_dir for files
525 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
526 my @files = readdir(DH);
527 closedir(DH);
528 foreach my $file (@files)
529 {
530 # - skip dot prefix files
531 if ($file !~ /^\./)
532 {
533 my $src_path = $src_dir . '/' . $file;
534 # - recurse directories, remembering to extend HDFS dir too
535 if (-d $src_path)
536 {
537 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
538 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
539 }
540 # - and use 'put' to copy files
541 else
542 {
543 my $hdfs_path = $hdfs_dir . '/' . $file;
544 if ($use_nfs)
545 {
546 &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
547 }
548 else
549 {
550 &hdfsCommand('put', $src_path, $hdfs_path);
551 }
552 $file_count++;
553 }
554 }
555 }
556 return $file_count;
557}
558## recursiveCopy() ##
559
560
561# /** @function shellCommand
562# */
563sub shellCommand
564{
565 my $cmd = shift(@_);
566 my $output = '';
567 &debugPrint($cmd);
568 if (!$dry_run)
569 {
570 $output = `$cmd`;
571 }
572 return $output;
573}
574# /** shellCommand() **/
575
576# /** @function urlCat
577# */
578sub urlCat
579{
580 my $url = join('/', @_);
581 return $url;
582}
583# /** urlCat() **/
584
585# /**
586# */
587sub dirIsEmpty
588{
589 my $dir = shift(@_);
590 my @files;
591 if (-e $dir)
592 {
593 opendir(DIR, $dir) or die $!;
594 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
595 closedir(DIR);
596 }
597 @files ? 0 : 1;
598}
599# /** dirIsEmpty() **/
600
601
602## @function recursiveDelete()
603#
604sub recursiveDelete
605{
606 my ($dir, $prefix) = @_;
607 if ($dir =~ /^$prefix/)
608 {
609 &shellCommand('rm -rf "' . $dir . '"');
610 }
611}
612## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.