source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27654

Last change on this file since 27654 was 27654, checked in by jmt12, 11 years ago

Add the ability to stagger the starting of Mappers by placing a 'delay.me' file in the tmp directory of the compute node to delay. They will then be initially delayed by compute node number * 100 seconds

  • Property svn:executable set to *
File size: 15.4 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
32my $hdfs_fs_prefix = 'HDThriftFS://';
33my $refresh_import = 0;
34my $username = `whoami`;
35chomp($username);
36
37`rocks > /dev/null 2>&1`;
38my $is_rocks_cluster = ($? == 0);
39
40# 1. Read and validate parameters
41if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
42{
43 $collection = $ARGV[0];
44}
45else
46{
47 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
48 print STDERR "where: [debug] print more debug messages to STDERR\n";
49 print STDERR " [dry_run] don't actually perform an file actions\n";
50 exit;
51}
52my $offset = 1;
53while (defined $ARGV[$offset])
54{
55 if ($ARGV[$offset] eq '-debug')
56 {
57 $debug = 1;
58 }
59 if ($ARGV[$offset] eq '-disable_thrift')
60 {
61 $use_thrift = 0;
62 }
63 if ($ARGV[$offset] eq '-dry_run')
64 {
65 $dry_run = 1;
66 }
67 if ($ARGV[$offset] eq '-refresh_import')
68 {
69 $refresh_import = 1;
70 }
71 if ($ARGV[$offset] eq '-stagger')
72 {
73 $stagger = 1;
74 }
75 if ($ARGV[$offset] eq '-flush_diskcache')
76 {
77 $flush_diskcache = 1;
78 }
79 if ($ARGV[$offset] eq '-start_thrift')
80 {
81 $start_thrift = 1;
82 }
83 if ($ARGV[$offset] eq '-use_nfs')
84 {
85 $use_nfs = 1;
86 }
87 $offset++;
88}
89
90if (!$use_thrift)
91{
92 $hdfs_fs_prefix = 'HDFSShell://';
93}
94if ($use_nfs)
95{
96 $hdfs_fs_prefix = '/hdfs';
97}
98
99my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
100my $gs_import_dir = $gs_collection_dir . '/import';
101if (!-d $gs_import_dir)
102{
103 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
104}
105my $gs_results_dir = $gs_collection_dir . '/results';
106if (!-d $gs_results_dir)
107{
108 mkdir($gs_results_dir, 0755);
109}
110$gs_results_dir .= '/' . time();
111if (!-d $gs_results_dir)
112{
113 mkdir($gs_results_dir, 0755);
114}
115# - directories within HDFS
116my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
117print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
118my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
119if ($use_nfs)
120{
121 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
122}
123my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
124print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
125my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
126if ($use_nfs)
127{
128 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
129}
130
131# 2. Copy the import directory into HDFS
132print " * Replicating import directory in HDFS...";
133# - check if import directory already exists
134my $hdfs_import_exists = 0;
135if ($use_nfs)
136{
137 if (-d $nfs_input_dir)
138 {
139 $hdfs_import_exists = 1;
140 }
141}
142else
143{
144 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
145}
146if ($refresh_import || !$hdfs_import_exists)
147{
148 # - clear out the old import directory
149 if ($hdfs_import_exists)
150 {
151 if ($use_nfs)
152 {
153 &recursiveDelete($nfs_input_dir, '/hdfs');
154 }
155 else
156 {
157 &hdfsCommand('rmr', $hdfs_input_dir);
158 }
159 }
160 # - now recursively copy the contents of import directory into HDFS ensuring
161 # that relative paths are maintained
162 my $file_count = 0;
163 if ($use_nfs)
164 {
165 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
166 }
167 else
168 {
169 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
170 }
171 &debugPrint($file_count . " files 'putted'");
172 print "Done!\n";
173}
174else
175{
176 print "Already exists!\n";
177}
178
179# - clear out the archives regardless
180my $gs_archives_dir = $gs_collection_dir . '/archives';
181my $deleted_archives = 0;
182if (-e $gs_archives_dir)
183{
184 print " * Clearing existing archives directory for this collection... ";
185 &recursiveDelete($gs_archives_dir, $gsdl_home);
186 $deleted_archives = 1;
187}
188mkdir($gs_archives_dir, 0755);
189my $hdfs_archives_exists = 0;
190if ($use_nfs)
191{
192 if (-d $nfs_output_dir)
193 {
194 $hdfs_archives_exists = 1;
195 }
196}
197else
198{
199 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
200}
201if ($hdfs_archives_exists)
202{
203 if (!$deleted_archives)
204 {
205 print " * Clearing existing archives directory for this collection... ";
206 }
207 if ($use_nfs)
208 {
209 &recursiveDelete($nfs_output_dir, '/hdfs');
210 }
211 else
212 {
213 &hdfsCommand('rmr', $hdfs_output_dir);
214 }
215 $deleted_archives = 1;
216}
217if ($deleted_archives)
218{
219 print "Done!\n";
220}
221
222# - watch for cached directories for Media based collections
223my $gs_cached_dir = $gs_collection_dir . '/cached';
224if (-e $gs_cached_dir)
225{
226 print " * Clearing existing cached media directory for this collection... ";
227 &recursiveDelete($gs_cached_dir, $gsdl_home);
228 print "Done!\n";
229}
230
231# - clear out any old logs
232print " * Clearing existing logs for this collection... ";
233my $gs_logs_dir = $gs_collection_dir . '/logs';
234if (!&dirIsEmpty($gs_logs_dir))
235{
236 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
237}
238if (!&dirIsEmpty('/tmp/greenstone'))
239{
240 &shellCommand('rm -f /tmp/greenstone/*.*');
241 &shellCommand('rm -rf /tmp/gsimport*');
242 &shellCommand('rm -rf /tmp/thrift');
243}
244if ($is_rocks_cluster)
245{
246 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
247 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
248 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
249}
250print "Done!\n";
251
252# - flush DNS cache too, so we are playing on a level field
253if ($flush_diskcache)
254{
255 print " * Flushing disk cache... ";
256 &shellCommand('flush_caches.pl');
257 if ($is_rocks_cluster)
258 {
259 &shellCommand('rocks run host "flush_caches.pl"');
260 }
261 print "Done!\n";
262}
263
264# - If we've been asked to Stagger start-up, add "delay.me" files to the
265# compute nodes
266if ($is_rocks_cluster && $stagger)
267{
268 &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
269}
270
271# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
272# where we start the server now to ensure it lives on the head node
273my $server_host = '';
274my $server_port = '';
275my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
276my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
277my $server_prefix = '';
278if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
279{
280 $server_prefix = uc($1);
281 print " * Starting " . $server_prefix . "Server... ";
282 # - start the server on the head node and retrieve the host and port from
283 # the output
284 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
285 my $launcher_output = &shellCommand($launcher_command);
286 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
287 {
288 $server_host = $1;
289 $server_port = $2;
290 print "running on " . $server_host . ":" . $server_port . "\n";
291 }
292 else
293 {
294 print "Failed!\n";
295 exit;
296 }
297 # - use the client tool to add ourselves as a listener
298 print " * Registering as listener... ";
299 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
300 &shellCommand($client_command);
301 print "Done!\n";
302}
303else
304{
305 print "Error! True Hadoop processing is only available when Greenstone is\n";
306 print " configured to use either GDBMServer or TDBServer.\n";
307 exit;
308}
309
310# 3.5 Start up the thrift server(s) if we've been asked to
311my $thrift_log = $gs_results_dir . '/thriftctl.log';
312if ($start_thrift)
313{
314 if ($is_rocks_cluster)
315 {
316 print " * Starting Thrift Servers (on compute nodes)... ";
317 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
318 }
319 # single server
320 else
321 {
322 print " * Starting Thrift Server... ";
323 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
324 }
325 print "Done!\n";
326}
327
328my $actual_archives_dir;
329if ($use_nfs)
330{
331 $actual_archives_dir = $nfs_output_dir;
332}
333else
334{
335 $actual_archives_dir = $hdfs_output_dir;
336 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
337}
338
339# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
340# and allow the FileInputFormat to split it up into files to be processed
341# in Greenstone. This works for collections with one file per document, like
342# Lorem and ReplayMe, but might not work well with multiple file documents
343# such as the Demo collection
344print " * Running import using Hadoop...";
345my $hadoop_log = $gs_results_dir . '/hadoop.log';
346&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
347my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest ';
348$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
349$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
350$hadoop_command .= $collection . ' '; # The collection name
351$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
352$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
353$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
354$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
355$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
356&shellCommand($hadoop_command);
357print "Done!\n";
358
359# 5. If we ran *Server infodbs, we now need to shut them down
360if ($server_prefix ne '')
361{
362 print " * Deregistering as listener and shutting down... ";
363 # - deregister as a listener
364 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
365 &shellCommand($client_command1);
366 # - send quit command
367 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
368 &shellCommand($client_command2);
369 print "Done!\n";
370}
371
372# 5.5 We started them - so we better stop the thrift servers too
373# 3.5 Start up the thrift server(s) if we've been asked to
374if ($start_thrift)
375{
376 if ($is_rocks_cluster)
377 {
378 print " * Stopping Thrift Servers (on compute nodes)... ";
379 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
380 }
381 # single server
382 else
383 {
384 print " * Stoping Thrift Server... ";
385 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
386 }
387 print "Done!\n";
388}
389
390# 6. Gather logs
391print " * Gathering logs from compute nodes... ";
392# - local files
393if (!&dirIsEmpty('/tmp/greenstone'))
394{
395 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
396}
397if (-d $gs_collection_dir . '/logs')
398{
399 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
400}
401if ($start_thrift && -d '/tmp/thrift')
402{
403 &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
404}
405# - remote files
406if ($is_rocks_cluster)
407{
408 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
409 &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
410 if ($start_thrift)
411 {
412 &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
413 }
414}
415print "Done!\n";
416
417# - generate data locality report...
418&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
419
420# - hadoop report...
421&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
422
423# - and gantt chart
424&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
425
426# 7. Done - clean up
427print " * Cleaning up temporary files... ";
428&shellCommand('rm -rf /tmp/greenstone');
429&shellCommand('rm -rf /tmp/gsimport*');
430if ($is_rocks_cluster)
431{
432 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
433 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
434}
435print "Done!\n";
436print "Complete!\n\n";
437
438exit;
439
440# /** @function debugPrint
441# */
442sub debugPrint
443{
444 my $msg = shift(@_);
445 if ($debug)
446 {
447 print "[Debug] " . $msg . "\n";
448 }
449}
450# /** debugPrint() **/
451
452# /** @function hdfsCommand
453# */
454sub hdfsCommand
455{
456 my $command = shift(@_);
457 my $paths = '"' . join('" "', @_) . '"';
458 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
459 &shellCommand($hdfs_command);
460 return $?;
461}
462# /** hdfsCommand() **/
463
464# /** @function hdfsTest
465# */
466sub hdfsTest
467{
468 my $command = shift(@_);
469 my $test_target = shift(@_);
470 my $result = &hdfsCommand('test -' . $command, @_);
471 return ($result == $test_target);
472}
473# /** hdfsTest() **/
474
475# /**
476# */
477sub printUsage
478{
479 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
480 exit;
481}
482# /** printUsage() **/
483
484
485## @function recursiveCopy()
486#
487sub recursiveCopy
488{
489 my ($src_dir, $hdfs_dir) = @_;
490 my $file_count = 0;
491 # - create the directory in HDFS
492 if ($use_nfs)
493 {
494 &shellCommand('mkdir "' . $hdfs_dir . '"');
495 }
496 else
497 {
498 &hdfsCommand('mkdir', $hdfs_dir);
499 }
500 # - search $src_dir for files
501 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
502 my @files = readdir(DH);
503 closedir(DH);
504 foreach my $file (@files)
505 {
506 # - skip dot prefix files
507 if ($file !~ /^\./)
508 {
509 my $src_path = $src_dir . '/' . $file;
510 # - recurse directories, remembering to extend HDFS dir too
511 if (-d $src_path)
512 {
513 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
514 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
515 }
516 # - and use 'put' to copy files
517 else
518 {
519 my $hdfs_path = $hdfs_dir . '/' . $file;
520 if ($use_nfs)
521 {
522 &shellCommand('cp "' . $src_path . '" "' . $hdfs_path . '"');
523 }
524 else
525 {
526 &hdfsCommand('put', $src_path, $hdfs_path);
527 }
528 $file_count++;
529 }
530 }
531 }
532 return $file_count;
533}
534## recursiveCopy() ##
535
536
537# /** @function shellCommand
538# */
539sub shellCommand
540{
541 my $cmd = shift(@_);
542 my $output = '';
543 &debugPrint($cmd);
544 if (!$dry_run)
545 {
546 $output = `$cmd`;
547 }
548 return $output;
549}
550# /** shellCommand() **/
551
552# /** @function urlCat
553# */
554sub urlCat
555{
556 my $url = join('/', @_);
557 return $url;
558}
559# /** urlCat() **/
560
561# /**
562# */
563sub dirIsEmpty
564{
565 my $dir = shift(@_);
566 my @files;
567 if (-e $dir)
568 {
569 opendir(DIR, $dir) or die $!;
570 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
571 closedir(DIR);
572 }
573 @files ? 0 : 1;
574}
575# /** dirIsEmpty() **/
576
577
578## @function recursiveDelete()
579#
580sub recursiveDelete
581{
582 my ($dir, $prefix) = @_;
583 if ($dir =~ /^$prefix/)
584 {
585 &shellCommand('rm -rf "' . $dir . '"');
586 }
587}
588## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.