source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27732

Last change on this file since 27732 was 27732, checked in by jmt12, 11 years ago

Nice the copy itself too

  • Property svn:executable set to *
File size: 15.5 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
32my $hdfs_fs_prefix = 'HDThriftFS://';
33my $refresh_import = 0;
34my $username = `whoami`;
35chomp($username);
36
37`rocks > /dev/null 2>&1`;
38my $is_rocks_cluster = ($? == 0);
39
40# 1. Read and validate parameters
41print 'Options: ' . join(' ', @ARGV) . "\n";
42if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
43{
44 $collection = $ARGV[0];
45 print ' collection: ' . $collection . "\n";
46}
47else
48{
49 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
50 print STDERR "where: [debug] print more debug messages to STDERR\n";
51 print STDERR " [dry_run] don't actually perform an file actions\n";
52 exit;
53}
54my $offset = 1;
55while (defined $ARGV[$offset])
56{
57 if ($ARGV[$offset] eq '-debug')
58 {
59 $debug = 1;
60 }
61 if ($ARGV[$offset] eq '-disable_thrift')
62 {
63 $use_thrift = 0;
64 }
65 if ($ARGV[$offset] eq '-dry_run')
66 {
67 $dry_run = 1;
68 }
69 if ($ARGV[$offset] eq '-refresh_import')
70 {
71 $refresh_import = 1;
72 }
73 if ($ARGV[$offset] eq '-stagger')
74 {
75 $stagger = 1;
76 }
77 if ($ARGV[$offset] eq '-flush_diskcache')
78 {
79 $flush_diskcache = 1;
80 }
81 if ($ARGV[$offset] eq '-start_thrift')
82 {
83 $start_thrift = 1;
84 }
85 if ($ARGV[$offset] eq '-use_nfs')
86 {
87 $use_nfs = 1;
88 }
89 $offset++;
90}
91
92if (!$use_thrift)
93{
94 $hdfs_fs_prefix = 'HDFSShell://';
95}
96if ($use_nfs)
97{
98 $hdfs_fs_prefix = '/hdfs';
99}
100
101my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
102my $gs_import_dir = $gs_collection_dir . '/import';
103if (!-d $gs_import_dir)
104{
105 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
106}
107my $gs_results_dir = $gs_collection_dir . '/results';
108if (!-d $gs_results_dir)
109{
110 mkdir($gs_results_dir, 0755);
111}
112$gs_results_dir .= '/' . time();
113if (!-d $gs_results_dir)
114{
115 mkdir($gs_results_dir, 0755);
116}
117# - directories within HDFS
118my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
119print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
120my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
121if ($use_nfs)
122{
123 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
124}
125my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
126print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
127my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
128if ($use_nfs)
129{
130 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
131}
132
133# 2. Copy the import directory into HDFS
134print " * Replicating import directory in HDFS...";
135# - check if import directory already exists
136my $hdfs_import_exists = 0;
137if ($use_nfs)
138{
139 if (-d $nfs_input_dir)
140 {
141 $hdfs_import_exists = 1;
142 }
143}
144else
145{
146 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
147}
148if ($refresh_import || !$hdfs_import_exists)
149{
150 # - clear out the old import directory
151 if ($hdfs_import_exists)
152 {
153 if ($use_nfs)
154 {
155 &recursiveDelete($nfs_input_dir, '/hdfs');
156 }
157 else
158 {
159 &hdfsCommand('rmr', $hdfs_input_dir);
160 }
161 }
162 # - now recursively copy the contents of import directory into HDFS ensuring
163 # that relative paths are maintained
164 my $file_count = 0;
165 if ($use_nfs)
166 {
167 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
168 }
169 else
170 {
171 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
172 }
173 &debugPrint($file_count . " files 'putted'");
174 print "Done!\n";
175}
176else
177{
178 print "Already exists!\n";
179}
180
181# - clear out the archives regardless
182my $gs_archives_dir = $gs_collection_dir . '/archives';
183my $deleted_archives = 0;
184if (-e $gs_archives_dir)
185{
186 print " * Clearing existing archives directory for this collection... ";
187 &recursiveDelete($gs_archives_dir, $gsdl_home);
188 $deleted_archives = 1;
189}
190mkdir($gs_archives_dir, 0755);
191my $hdfs_archives_exists = 0;
192if ($use_nfs)
193{
194 if (-d $nfs_output_dir)
195 {
196 $hdfs_archives_exists = 1;
197 }
198}
199else
200{
201 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
202}
203if ($hdfs_archives_exists)
204{
205 if (!$deleted_archives)
206 {
207 print " * Clearing existing archives directory for this collection... ";
208 }
209 if ($use_nfs)
210 {
211 &recursiveDelete($nfs_output_dir, '/hdfs');
212 }
213 else
214 {
215 &hdfsCommand('rmr', $hdfs_output_dir);
216 }
217 $deleted_archives = 1;
218}
219if ($deleted_archives)
220{
221 print "Done!\n";
222}
223
224# - watch for cached directories for Media based collections
225my $gs_cached_dir = $gs_collection_dir . '/cached';
226if (-e $gs_cached_dir)
227{
228 print " * Clearing existing cached media directory for this collection... ";
229 &recursiveDelete($gs_cached_dir, $gsdl_home);
230 print "Done!\n";
231}
232
233# - clear out any old logs
234print " * Clearing existing logs for this collection... ";
235my $gs_logs_dir = $gs_collection_dir . '/logs';
236if (!&dirIsEmpty($gs_logs_dir))
237{
238 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
239}
240if (!&dirIsEmpty('/tmp/greenstone'))
241{
242 &shellCommand('rm -f /tmp/greenstone/*.*');
243 &shellCommand('rm -rf /tmp/gsimport*');
244 &shellCommand('rm -rf /tmp/thrift');
245}
246if ($is_rocks_cluster)
247{
248 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
249 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
250 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
251}
252print "Done!\n";
253
254# - flush DNS cache too, so we are playing on a level field
255if ($flush_diskcache)
256{
257 print " * Flushing disk cache... ";
258 &shellCommand('flush_caches.pl');
259 if ($is_rocks_cluster)
260 {
261 &shellCommand('rocks run host "flush_caches.pl"');
262 }
263 print "Done!\n";
264}
265
266# - If we've been asked to Stagger start-up, add "delay.me" files to the
267# compute nodes
268if ($is_rocks_cluster && $stagger)
269{
270 &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
271}
272
273# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
274# where we start the server now to ensure it lives on the head node
275my $server_host = '';
276my $server_port = '';
277my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
278my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
279my $server_prefix = '';
280if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
281{
282 $server_prefix = uc($1);
283 print " * Starting " . $server_prefix . "Server... ";
284 # - start the server on the head node and retrieve the host and port from
285 # the output
286 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
287 my $launcher_output = &shellCommand($launcher_command);
288 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
289 {
290 $server_host = $1;
291 $server_port = $2;
292 print "running on " . $server_host . ":" . $server_port . "\n";
293 }
294 else
295 {
296 print "Failed!\n";
297 exit;
298 }
299 # - use the client tool to add ourselves as a listener
300 print " * Registering as listener... ";
301 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
302 &shellCommand($client_command);
303 print "Done!\n";
304}
305else
306{
307 print "Error! True Hadoop processing is only available when Greenstone is\n";
308 print " configured to use either GDBMServer or TDBServer.\n";
309 exit;
310}
311
312# 3.5 Start up the thrift server(s) if we've been asked to
313my $thrift_log = $gs_results_dir . '/thriftctl.log';
314if ($start_thrift)
315{
316 if ($is_rocks_cluster)
317 {
318 print " * Starting Thrift Servers (on compute nodes)... ";
319 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
320 }
321 # single server
322 else
323 {
324 print " * Starting Thrift Server... ";
325 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
326 }
327 print "Done!\n";
328}
329
330my $actual_archives_dir;
331if ($use_nfs)
332{
333 $actual_archives_dir = $nfs_output_dir;
334}
335else
336{
337 $actual_archives_dir = $hdfs_output_dir;
338 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
339}
340
341# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
342# and allow the FileInputFormat to split it up into files to be processed
343# in Greenstone. This works for collections with one file per document, like
344# Lorem and ReplayMe, but might not work well with multiple file documents
345# such as the Demo collection
346print " * Running import using Hadoop...";
347my $hadoop_log = $gs_results_dir . '/hadoop.log';
348&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
349my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest ';
350$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
351$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
352$hadoop_command .= $collection . ' '; # The collection name
353$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
354$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
355$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
356$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
357$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
358&shellCommand($hadoop_command);
359print "Done!\n";
360
361# 5. If we ran *Server infodbs, we now need to shut them down
362if ($server_prefix ne '')
363{
364 print " * Deregistering as listener and shutting down... ";
365 # - deregister as a listener
366 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
367 &shellCommand($client_command1);
368 # - send quit command
369 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
370 &shellCommand($client_command2);
371 print "Done!\n";
372}
373
374# 5.5 We started them - so we better stop the thrift servers too
375# 3.5 Start up the thrift server(s) if we've been asked to
376if ($start_thrift)
377{
378 if ($is_rocks_cluster)
379 {
380 print " * Stopping Thrift Servers (on compute nodes)... ";
381 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
382 }
383 # single server
384 else
385 {
386 print " * Stoping Thrift Server... ";
387 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
388 }
389 print "Done!\n";
390}
391
392# 6. Gather logs
393print " * Gathering logs from compute nodes... ";
394# - local files
395if (!&dirIsEmpty('/tmp/greenstone'))
396{
397 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
398}
399if (-d $gs_collection_dir . '/logs')
400{
401 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
402}
403if ($start_thrift && -d '/tmp/thrift')
404{
405 &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
406}
407# - remote files
408if ($is_rocks_cluster)
409{
410 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
411 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
412 if ($start_thrift)
413 {
414 &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
415 }
416}
417print "Done!\n";
418
419# - generate data locality report...
420if (!$use_nfs && !$use_thrift)
421{
422 &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
423}
424
425# - hadoop report...
426&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
427
428# - and gantt chart
429&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
430
431# 7. Done - clean up
432print " * Cleaning up temporary files... ";
433&shellCommand('rm -rf /tmp/greenstone');
434&shellCommand('rm -rf /tmp/gsimport*');
435if ($is_rocks_cluster)
436{
437 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
438 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
439}
440print "Done!\n";
441print "Complete!\n\n";
442
443exit;
444
445# /** @function debugPrint
446# */
447sub debugPrint
448{
449 my $msg = shift(@_);
450 if ($debug)
451 {
452 print "[Debug] " . $msg . "\n";
453 }
454}
455# /** debugPrint() **/
456
457# /** @function hdfsCommand
458# */
459sub hdfsCommand
460{
461 my $command = shift(@_);
462 my $paths = '"' . join('" "', @_) . '"';
463 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
464 &shellCommand($hdfs_command);
465 return $?;
466}
467# /** hdfsCommand() **/
468
469# /** @function hdfsTest
470# */
471sub hdfsTest
472{
473 my $command = shift(@_);
474 my $test_target = shift(@_);
475 my $result = &hdfsCommand('test -' . $command, @_);
476 return ($result == $test_target);
477}
478# /** hdfsTest() **/
479
480# /**
481# */
482sub printUsage
483{
484 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
485 exit;
486}
487# /** printUsage() **/
488
489
490## @function recursiveCopy()
491#
492sub recursiveCopy
493{
494 my ($src_dir, $hdfs_dir) = @_;
495 my $file_count = 0;
496 # - create the directory in HDFS
497 if ($use_nfs)
498 {
499 &shellCommand('mkdir "' . $hdfs_dir . '"');
500 }
501 else
502 {
503 &hdfsCommand('mkdir', $hdfs_dir);
504 }
505 # - search $src_dir for files
506 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
507 my @files = readdir(DH);
508 closedir(DH);
509 foreach my $file (@files)
510 {
511 # - skip dot prefix files
512 if ($file !~ /^\./)
513 {
514 my $src_path = $src_dir . '/' . $file;
515 # - recurse directories, remembering to extend HDFS dir too
516 if (-d $src_path)
517 {
518 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
519 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
520 }
521 # - and use 'put' to copy files
522 else
523 {
524 my $hdfs_path = $hdfs_dir . '/' . $file;
525 if ($use_nfs)
526 {
527 &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
528 }
529 else
530 {
531 &hdfsCommand('put', $src_path, $hdfs_path);
532 }
533 $file_count++;
534 }
535 }
536 }
537 return $file_count;
538}
539## recursiveCopy() ##
540
541
542# /** @function shellCommand
543# */
544sub shellCommand
545{
546 my $cmd = shift(@_);
547 my $output = '';
548 &debugPrint($cmd);
549 if (!$dry_run)
550 {
551 $output = `$cmd`;
552 }
553 return $output;
554}
555# /** shellCommand() **/
556
557# /** @function urlCat
558# */
559sub urlCat
560{
561 my $url = join('/', @_);
562 return $url;
563}
564# /** urlCat() **/
565
566# /**
567# */
568sub dirIsEmpty
569{
570 my $dir = shift(@_);
571 my @files;
572 if (-e $dir)
573 {
574 opendir(DIR, $dir) or die $!;
575 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
576 closedir(DIR);
577 }
578 @files ? 0 : 1;
579}
580# /** dirIsEmpty() **/
581
582
583## @function recursiveDelete()
584#
585sub recursiveDelete
586{
587 my ($dir, $prefix) = @_;
588 if ($dir =~ /^$prefix/)
589 {
590 &shellCommand('rm -rf "' . $dir . '"');
591 }
592}
593## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.