source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27913

Last change on this file since 27913 was 27913, checked in by jmt12, 11 years ago

Made the ingester to be used (version 1 without reduce phase, or version 2 that includes reduce) configurable

  • Property svn:executable set to *
File size: 15.7 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $java_library = 'HadoopGreenstoneIngest2';
32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
33my $hdfs_fs_prefix = 'HDThriftFS://';
34my $refresh_import = 0;
35my $username = `whoami`;
36chomp($username);
37
38`rocks > /dev/null 2>&1`;
39my $is_rocks_cluster = ($? == 0);
40
41# 1. Read and validate parameters
42print 'Options: ' . join(' ', @ARGV) . "\n";
43if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
44{
45 $collection = $ARGV[0];
46 print ' collection: ' . $collection . "\n";
47}
48else
49{
50 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
51 print STDERR "where: [debug] print more debug messages to STDERR\n";
52 print STDERR " [dry_run] don't actually perform an file actions\n";
53 exit;
54}
55my $offset = 1;
56while (defined $ARGV[$offset])
57{
58 if ($ARGV[$offset] eq '-debug')
59 {
60 $debug = 1;
61 }
62 if ($ARGV[$offset] eq '-disable_thrift')
63 {
64 $use_thrift = 0;
65 }
66 if ($ARGV[$offset] eq '-dry_run')
67 {
68 $dry_run = 1;
69 }
70 if ($ARGV[$offset] eq '-refresh_import')
71 {
72 $refresh_import = 1;
73 }
74 if ($ARGV[$offset] eq '-stagger')
75 {
76 $stagger = 1;
77 }
78 if ($ARGV[$offset] eq '-flush_diskcache')
79 {
80 $flush_diskcache = 1;
81 }
82 if ($ARGV[$offset] eq '-start_thrift')
83 {
84 $start_thrift = 1;
85 }
86 if ($ARGV[$offset] eq '-use_nfs')
87 {
88 $use_nfs = 1;
89 }
90 $offset++;
91}
92
93if (!$use_thrift)
94{
95 $hdfs_fs_prefix = 'HDFSShell://';
96}
97if ($use_nfs)
98{
99 $hdfs_fs_prefix = '/hdfs';
100}
101
102my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
103my $gs_import_dir = $gs_collection_dir . '/import';
104if (!-d $gs_import_dir)
105{
106 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
107}
108my $gs_results_dir = $gs_collection_dir . '/results';
109if (!-d $gs_results_dir)
110{
111 mkdir($gs_results_dir, 0755);
112}
113$gs_results_dir .= '/' . time();
114if (!-d $gs_results_dir)
115{
116 mkdir($gs_results_dir, 0755);
117}
118# - directories within HDFS
119my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
120print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
121my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
122if ($use_nfs)
123{
124 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
125}
126my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
127print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
128my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
129if ($use_nfs)
130{
131 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
132}
133
134# 2. Copy the import directory into HDFS
135print " * Replicating import directory in HDFS...";
136# - check if import directory already exists
137my $hdfs_import_exists = 0;
138if ($use_nfs)
139{
140 if (-d $nfs_input_dir)
141 {
142 $hdfs_import_exists = 1;
143 }
144}
145else
146{
147 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
148}
149if ($refresh_import || !$hdfs_import_exists)
150{
151 # - clear out the old import directory
152 if ($hdfs_import_exists)
153 {
154 if ($use_nfs)
155 {
156 &recursiveDelete($nfs_input_dir, '/hdfs');
157 }
158 else
159 {
160 &hdfsCommand('rmr', $hdfs_input_dir);
161 }
162 }
163 # - now recursively copy the contents of import directory into HDFS ensuring
164 # that relative paths are maintained
165 my $file_count = 0;
166 if ($use_nfs)
167 {
168 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
169 }
170 else
171 {
172 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
173 }
174 &debugPrint($file_count . " files 'putted'");
175 print "Done!\n";
176}
177else
178{
179 print "Already exists!\n";
180}
181
182# - clear out the archives regardless
183my $gs_archives_dir = $gs_collection_dir . '/archives';
184my $deleted_archives = 0;
185if (-e $gs_archives_dir)
186{
187 print " * Clearing existing archives directory for this collection... ";
188 &recursiveDelete($gs_archives_dir, $gsdl_home);
189 $deleted_archives = 1;
190}
191mkdir($gs_archives_dir, 0755);
192my $hdfs_archives_exists = 0;
193if ($use_nfs)
194{
195 if (-d $nfs_output_dir)
196 {
197 $hdfs_archives_exists = 1;
198 }
199}
200else
201{
202 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
203}
204if ($hdfs_archives_exists)
205{
206 if (!$deleted_archives)
207 {
208 print " * Clearing existing archives directory for this collection... ";
209 }
210 if ($use_nfs)
211 {
212 &recursiveDelete($nfs_output_dir, '/hdfs');
213 }
214 else
215 {
216 &hdfsCommand('rmr', $hdfs_output_dir);
217 }
218 $deleted_archives = 1;
219}
220if ($deleted_archives)
221{
222 print "Done!\n";
223}
224
225# - watch for cached directories for Media based collections
226my $gs_cached_dir = $gs_collection_dir . '/cached';
227if (-e $gs_cached_dir)
228{
229 print " * Clearing existing cached media directory for this collection... ";
230 &recursiveDelete($gs_cached_dir, $gsdl_home);
231 print "Done!\n";
232}
233
234# - clear out any old logs
235print " * Clearing existing logs for this collection... ";
236my $gs_logs_dir = $gs_collection_dir . '/logs';
237if (!&dirIsEmpty($gs_logs_dir))
238{
239 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
240}
241if (!&dirIsEmpty('/tmp/greenstone'))
242{
243 &shellCommand('rm -f /tmp/greenstone/*.*');
244 &shellCommand('rm -rf /tmp/gsimport*');
245 &shellCommand('rm -rf /tmp/thrift');
246}
247if ($is_rocks_cluster)
248{
249 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
250 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
251 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
252}
253print "Done!\n";
254
255# - flush DNS cache too, so we are playing on a level field
256if ($flush_diskcache)
257{
258 print " * Flushing disk cache... ";
259 &shellCommand('flush_caches.pl');
260 if ($is_rocks_cluster)
261 {
262 &shellCommand('rocks run host "flush_caches.pl"');
263 }
264 print "Done!\n";
265}
266
267# - If we've been asked to Stagger start-up, add "delay.me" files to the
268# compute nodes
269if ($is_rocks_cluster && $stagger)
270{
271 &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
272}
273
274# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
275# where we start the server now to ensure it lives on the head node
276my $server_host = '';
277my $server_port = '';
278my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
279my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
280my $server_prefix = '';
281if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
282{
283 $server_prefix = uc($1);
284 print " * Starting " . $server_prefix . "Server... ";
285 # - start the server on the head node and retrieve the host and port from
286 # the output
287 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
288 my $launcher_output = &shellCommand($launcher_command);
289 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
290 {
291 $server_host = $1;
292 $server_port = $2;
293 print "running on " . $server_host . ":" . $server_port . "\n";
294 }
295 else
296 {
297 print "Failed!\n";
298 exit;
299 }
300 # - use the client tool to add ourselves as a listener
301 print " * Registering as listener... ";
302 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
303 &shellCommand($client_command);
304 print "Done!\n";
305}
306elsif ($infodbtype =~ /stdoutxml/)
307{
308 print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
309}
310else
311{
312 print "Error! True Hadoop processing is only available when Greenstone is\n";
313 print " configured to use either GDBMServer or TDBServer.\n";
314 exit;
315}
316
317# 3.5 Start up the thrift server(s) if we've been asked to
318my $thrift_log = $gs_results_dir . '/thriftctl.log';
319if ($start_thrift)
320{
321 if ($is_rocks_cluster)
322 {
323 print " * Starting Thrift Servers (on compute nodes)... ";
324 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
325 }
326 # single server
327 else
328 {
329 print " * Starting Thrift Server... ";
330 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
331 }
332 print "Done!\n";
333}
334
335my $actual_archives_dir;
336if ($use_nfs)
337{
338 $actual_archives_dir = $nfs_output_dir;
339}
340else
341{
342 $actual_archives_dir = $hdfs_output_dir;
343 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
344}
345
346# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
347# and allow the FileInputFormat to split it up into files to be processed
348# in Greenstone. This works for collections with one file per document, like
349# Lorem and ReplayMe, but might not work well with multiple file documents
350# such as the Demo collection
351print " * Running import using Hadoop...";
352my $hadoop_log = $gs_results_dir . '/hadoop.log';
353&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
354my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
355$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
356$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
357$hadoop_command .= $collection . ' '; # The collection name
358$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
359$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
360$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
361$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
362$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
363&shellCommand($hadoop_command);
364print "Done!\n";
365
366# 5. If we ran *Server infodbs, we now need to shut them down
367if ($server_prefix ne '')
368{
369 print " * Deregistering as listener and shutting down... ";
370 # - deregister as a listener
371 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
372 &shellCommand($client_command1);
373 # - send quit command
374 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
375 &shellCommand($client_command2);
376 print "Done!\n";
377}
378
379# 5.5 We started them - so we better stop the thrift servers too
380# 3.5 Start up the thrift server(s) if we've been asked to
381if ($start_thrift)
382{
383 if ($is_rocks_cluster)
384 {
385 print " * Stopping Thrift Servers (on compute nodes)... ";
386 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
387 }
388 # single server
389 else
390 {
391 print " * Stoping Thrift Server... ";
392 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
393 }
394 print "Done!\n";
395}
396
397# 6. Gather logs
398print " * Gathering logs from compute nodes... ";
399# - local files
400if (!&dirIsEmpty('/tmp/greenstone'))
401{
402 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
403}
404if (-d $gs_collection_dir . '/logs')
405{
406 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
407}
408if ($start_thrift && -d '/tmp/thrift')
409{
410 &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
411}
412# - remote files
413if ($is_rocks_cluster)
414{
415 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
416 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
417 if ($start_thrift)
418 {
419 &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
420 }
421}
422print "Done!\n";
423
424# - generate data locality report...
425if (!$use_nfs && !$use_thrift)
426{
427 &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
428}
429
430# - hadoop report...
431&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
432
433# - and gantt chart
434&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
435
436# 7. Done - clean up
437print " * Cleaning up temporary files... ";
438&shellCommand('rm -rf /tmp/greenstone');
439&shellCommand('rm -rf /tmp/gsimport*');
440if ($is_rocks_cluster)
441{
442 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
443 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
444}
445print "Done!\n";
446print "Complete!\n\n";
447
448exit;
449
450# /** @function debugPrint
451# */
452sub debugPrint
453{
454 my $msg = shift(@_);
455 if ($debug)
456 {
457 print "[Debug] " . $msg . "\n";
458 }
459}
460# /** debugPrint() **/
461
462# /** @function hdfsCommand
463# */
464sub hdfsCommand
465{
466 my $command = shift(@_);
467 my $paths = '"' . join('" "', @_) . '"';
468 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
469 &shellCommand($hdfs_command);
470 return $?;
471}
472# /** hdfsCommand() **/
473
474# /** @function hdfsTest
475# */
476sub hdfsTest
477{
478 my $command = shift(@_);
479 my $test_target = shift(@_);
480 my $result = &hdfsCommand('test -' . $command, @_);
481 return ($result == $test_target);
482}
483# /** hdfsTest() **/
484
485# /**
486# */
487sub printUsage
488{
489 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
490 exit;
491}
492# /** printUsage() **/
493
494
495## @function recursiveCopy()
496#
497sub recursiveCopy
498{
499 my ($src_dir, $hdfs_dir) = @_;
500 my $file_count = 0;
501 # - create the directory in HDFS
502 if ($use_nfs)
503 {
504 &shellCommand('mkdir "' . $hdfs_dir . '"');
505 }
506 else
507 {
508 &hdfsCommand('mkdir', $hdfs_dir);
509 }
510 # - search $src_dir for files
511 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
512 my @files = readdir(DH);
513 closedir(DH);
514 foreach my $file (@files)
515 {
516 # - skip dot prefix files
517 if ($file !~ /^\./)
518 {
519 my $src_path = $src_dir . '/' . $file;
520 # - recurse directories, remembering to extend HDFS dir too
521 if (-d $src_path)
522 {
523 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
524 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
525 }
526 # - and use 'put' to copy files
527 else
528 {
529 my $hdfs_path = $hdfs_dir . '/' . $file;
530 if ($use_nfs)
531 {
532 &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
533 }
534 else
535 {
536 &hdfsCommand('put', $src_path, $hdfs_path);
537 }
538 $file_count++;
539 }
540 }
541 }
542 return $file_count;
543}
544## recursiveCopy() ##
545
546
547# /** @function shellCommand
548# */
549sub shellCommand
550{
551 my $cmd = shift(@_);
552 my $output = '';
553 &debugPrint($cmd);
554 if (!$dry_run)
555 {
556 $output = `$cmd`;
557 }
558 return $output;
559}
560# /** shellCommand() **/
561
562# /** @function urlCat
563# */
564sub urlCat
565{
566 my $url = join('/', @_);
567 return $url;
568}
569# /** urlCat() **/
570
571# /**
572# */
573sub dirIsEmpty
574{
575 my $dir = shift(@_);
576 my @files;
577 if (-e $dir)
578 {
579 opendir(DIR, $dir) or die $!;
580 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
581 closedir(DIR);
582 }
583 @files ? 0 : 1;
584}
585# /** dirIsEmpty() **/
586
587
588## @function recursiveDelete()
589#
590sub recursiveDelete
591{
592 my ($dir, $prefix) = @_;
593 if ($dir =~ /^$prefix/)
594 {
595 &shellCommand('rm -rf "' . $dir . '"');
596 }
597}
598## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.