root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 28015

Revision 28015, 15.9 KB (checked in by jmt12, 7 years ago)

Add an extra option that allows me to pass in the directory to write log files to, and add extra check when copying logs (by *.*) that the log directory isn't empty

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 0;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $java_library = 'HadoopGreenstoneIngest2';
32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
33my $hdfs_fs_prefix = 'HDThriftFS://';
34my $refresh_import = 0;
35my $username = `whoami`;
36chomp($username);
37my $gs_results_dir = '';
38
39`rocks > /dev/null 2>&1`;
40my $is_rocks_cluster = ($? == 0);
41
42# 1. Read and validate parameters
43print 'Options: ' . join(' ', @ARGV) . "\n";
44if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
45{
46  $collection = $ARGV[0];
47  print ' collection: ' . $collection . "\n";
48}
49else
50{
51  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
52  print STDERR "where: [debug] print more debug messages to STDERR\n";
53  print STDERR "       [dry_run] don't actually perform an file actions\n";
54  exit;
55}
56my $offset = 1;
57while (defined $ARGV[$offset])
58{
59  if ($ARGV[$offset] eq '-debug')
60  {
61    $debug = 1;
62  }
63  if ($ARGV[$offset] eq '-disable_thrift')
64  {
65    $use_thrift = 0;
66  }
67  if ($ARGV[$offset] eq '-dry_run')
68  {
69    $dry_run = 1;
70  }
71  if ($ARGV[$offset] eq '-refresh_import')
72  {
73    $refresh_import = 1;
74  }
75  if ($ARGV[$offset] eq '-stagger')
76  {
77    $stagger = 1;
78  }
79  if ($ARGV[$offset] eq '-flush_diskcache')
80  {
81    $flush_diskcache = 1;
82  }
83  if ($ARGV[$offset] eq '-start_thrift')
84  {
85    $start_thrift = 1;
86  }
87  if ($ARGV[$offset] eq '-use_nfs')
88  {
89    $use_nfs = 1;
90  }
91  if ($ARGV[$offset] eq '-logdir')
92  {
93    $offset++;
94    $gs_results_dir = $ARGV[$offset];
95  }
96  $offset++;
97}
98
99if (!$use_thrift)
100{
101  $hdfs_fs_prefix = 'HDFSShell://';
102}
103if ($use_nfs)
104{
105  $hdfs_fs_prefix = '/hdfs';
106}
107
108my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
109my $gs_import_dir = $gs_collection_dir . '/import';
110if (!-d $gs_import_dir)
111{
112  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
113}
114if ($gs_results_dir eq '')
115{
116  $gs_results_dir = $gs_collection_dir . '/results';
117  if (!-d $gs_results_dir)
118  {
119    mkdir($gs_results_dir, 0755);
120  }
121  $gs_results_dir .= '/' . time();
122}
123if (!-d $gs_results_dir)
124{
125  mkdir($gs_results_dir, 0755);
126}
127# - directories within HDFS
128my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
129print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
130my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
131if ($use_nfs)
132{
133  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
134}
135my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
136print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
137my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
138if ($use_nfs)
139{
140  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
141}
142
143# 2. Copy the import directory into HDFS
144print " * Replicating import directory in HDFS...";
145# - check if import directory already exists
146my $hdfs_import_exists = 0;
147if ($use_nfs)
148{
149  if (-d $nfs_input_dir)
150  {
151    $hdfs_import_exists = 1;
152  }
153}
154else
155{
156  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
157}
158if ($refresh_import || !$hdfs_import_exists)
159{
160  # - clear out the old import directory
161  if ($hdfs_import_exists)
162  {
163    if ($use_nfs)
164    {
165      &recursiveDelete($nfs_input_dir, '/hdfs');
166    }
167    else
168    {
169      &hdfsCommand('rmr', $hdfs_input_dir);
170    }
171  }
172  # - now recursively copy the contents of import directory into HDFS ensuring
173  #   that relative paths are maintained
174  my $file_count = 0;
175  if ($use_nfs)
176  {
177    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
178  }
179  else
180  {
181    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
182  }
183  &debugPrint($file_count . " files 'putted'");
184  print "Done!\n";
185}
186else
187{
188  print "Already exists!\n";
189}
190
191# - clear out the archives regardless
192my $gs_archives_dir = $gs_collection_dir . '/archives';
193my $deleted_archives = 0;
194if (-e $gs_archives_dir)
195{
196  print " * Clearing existing archives directory for this collection... ";
197  &recursiveDelete($gs_archives_dir, $gsdl_home);
198  $deleted_archives = 1;
199}
200mkdir($gs_archives_dir, 0755);
201my $hdfs_archives_exists = 0;
202if ($use_nfs)
203{
204  if (-d $nfs_output_dir)
205  {
206    $hdfs_archives_exists = 1;
207  }
208}
209else
210{
211  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
212}
213if ($hdfs_archives_exists)
214{
215  if (!$deleted_archives)
216  {
217    print " * Clearing existing archives directory for this collection... ";
218  }
219  if ($use_nfs)
220  {
221    &recursiveDelete($nfs_output_dir, '/hdfs');
222  }
223  else
224  {
225    &hdfsCommand('rmr', $hdfs_output_dir);
226  }
227  $deleted_archives = 1;
228}
229if ($deleted_archives)
230{
231  print "Done!\n";
232}
233
234# - watch for cached directories for Media based collections
235my $gs_cached_dir = $gs_collection_dir . '/cached';
236if (-e $gs_cached_dir)
237{
238  print " * Clearing existing cached media directory for this collection... ";
239  &recursiveDelete($gs_cached_dir, $gsdl_home);
240  print "Done!\n";
241}
242
243# - clear out any old logs
244print " * Clearing existing logs for this collection... ";
245my $gs_logs_dir = $gs_collection_dir . '/logs';
246if (!&dirIsEmpty($gs_logs_dir))
247{
248  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
249}
250if (!&dirIsEmpty('/tmp/greenstone'))
251{
252  &shellCommand('rm -f /tmp/greenstone/*.*');
253  &shellCommand('rm -rf /tmp/gsimport*');
254  &shellCommand('rm -rf /tmp/thrift');
255}
256if ($is_rocks_cluster)
257{
258  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
259  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
260  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
261}
262print "Done!\n";
263
264# - flush DNS cache too, so we are playing on a level field
265if ($flush_diskcache)
266{
267  print " * Flushing disk cache... ";
268  &shellCommand('flush_caches.pl');
269  if ($is_rocks_cluster)
270  {
271    &shellCommand('rocks run host "flush_caches.pl"');
272  }
273  print "Done!\n";
274}
275
276# - If we've been asked to Stagger start-up, add "delay.me" files to the
277#   compute nodes
278if ($is_rocks_cluster && $stagger)
279{
280  &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
281}
282
283# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
284#    where we start the server now to ensure it lives on the head node
285my $server_host = '';
286my $server_port = '';
287my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
288my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
289my $server_prefix = '';
290if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
291{
292  $server_prefix = uc($1);
293  print " * Starting " . $server_prefix . "Server... ";
294  # - start the server on the head node and retrieve the host and port from
295  #   the output
296  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
297  my $launcher_output = &shellCommand($launcher_command);
298  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
299  {
300    $server_host = $1;
301    $server_port = $2;
302    print "running on " . $server_host . ":" . $server_port . "\n";
303  }
304  else
305  {
306    print "Failed!\n";
307    exit;
308  }
309  # - use the client tool to add ourselves as a listener
310  print " * Registering as listener... ";
311  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
312  &shellCommand($client_command);
313  print "Done!\n";
314}
315elsif ($infodbtype =~ /stdoutxml/)
316{
317  print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
318}
319else
320{
321  print "Error! True Hadoop processing is only available when Greenstone is\n";
322  print "       configured to use either GDBMServer or TDBServer.\n";
323  exit;
324}
325
326# 3.5 Start up the thrift server(s) if we've been asked to
327my $thrift_log = $gs_results_dir . '/thriftctl.log';
328if ($start_thrift)
329{
330  if ($is_rocks_cluster)
331  {
332    print " * Starting Thrift Servers (on compute nodes)... ";
333    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
334  }
335  # single server
336  else
337  {
338    print " * Starting Thrift Server... ";
339    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
340  }
341  print "Done!\n";
342}
343
344my $actual_archives_dir;
345if ($use_nfs)
346{
347  $actual_archives_dir = $nfs_output_dir;
348}
349else
350{
351  $actual_archives_dir = $hdfs_output_dir;
352  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
353}
354
355# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
356#    and allow the FileInputFormat to split it up into files to be processed
357#    in Greenstone. This works for collections with one file per document, like
358#    Lorem and ReplayMe, but might not work well with multiple file documents
359#    such as the Demo collection
360print " * Running import using Hadoop...";
361my $hadoop_log = $gs_results_dir . '/hadoop.log';
362&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
363my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
364$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
365$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
366$hadoop_command .= $collection . ' '; # The collection name
367$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
368$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
369$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
370$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
371$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
372&shellCommand($hadoop_command);
373print "Done!\n";
374
375# 5. If we ran *Server infodbs, we now need to shut them down
376if ($server_prefix ne '')
377{
378  print " * Deregistering as listener and shutting down... ";
379  # - deregister as a listener
380  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
381  &shellCommand($client_command1);
382  # - send quit command
383  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
384  &shellCommand($client_command2);
385  print "Done!\n";
386}
387
388# 5.5 We started them - so we better stop the thrift servers too
389# 3.5 Start up the thrift server(s) if we've been asked to
390if ($start_thrift)
391{
392  if ($is_rocks_cluster)
393  {
394    print " * Stopping Thrift Servers (on compute nodes)... ";
395    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
396  }
397  # single server
398  else
399  {
400    print " * Stoping Thrift Server... ";
401    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
402  }
403  print "Done!\n";
404}
405
406# 6. Gather logs
407print " * Gathering logs from compute nodes... ";
408# - local files
409if (!&dirIsEmpty('/tmp/greenstone'))
410{
411  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
412}
413if (-d $gs_collection_dir . '/logs' && !&dirIsEmpty($gs_collection_dir . '/logs'))
414{
415  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
416}
417if ($start_thrift && -d '/tmp/thrift')
418{
419  &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
420}
421# - remote files
422if ($is_rocks_cluster)
423{
424  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
425  &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
426  if ($start_thrift)
427  {
428    &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
429  }
430}
431print "Done!\n";
432
433# - generate data locality report...
434if (!$use_nfs && !$use_thrift)
435{
436  &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
437}
438
439# - hadoop report...
440&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
441
442# - and gantt chart
443&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
444
445# 7. Done - clean up
446print " * Cleaning up temporary files... ";
447&shellCommand('rm -rf /tmp/greenstone');
448&shellCommand('rm -rf /tmp/gsimport*');
449if ($is_rocks_cluster)
450{
451  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
452  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
453}
454print "Done!\n";
455print "Complete!\n\n";
456
457exit;
458
459# /** @function debugPrint
460#  */
461sub debugPrint
462{
463  my $msg = shift(@_);
464  if ($debug)
465  {
466    print "[Debug] " . $msg . "\n";
467  }
468}
469# /** debugPrint() **/
470
471# /** @function hdfsCommand
472#  */
473sub hdfsCommand
474{
475  my $command = shift(@_);
476  my $paths = '"' . join('" "', @_) . '"';
477  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
478  &shellCommand($hdfs_command);
479  return $?;
480}
481# /** hdfsCommand() **/
482
483# /** @function hdfsTest
484#  */
485sub hdfsTest
486{
487  my $command = shift(@_);
488  my $test_target = shift(@_);
489  my $result = &hdfsCommand('test -' . $command, @_);
490  return ($result == $test_target);
491}
492# /** hdfsTest() **/
493
494# /**
495#  */
496sub printUsage
497{
498  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
499  exit;
500}
501# /** printUsage() **/
502
503
504## @function recursiveCopy()
505#
506sub recursiveCopy
507{
508  my ($src_dir, $hdfs_dir) = @_;
509  my $file_count = 0;
510  # - create the directory in HDFS
511  if ($use_nfs)
512  {
513    &shellCommand('mkdir "' . $hdfs_dir . '"');
514  }
515  else
516  {
517    &hdfsCommand('mkdir', $hdfs_dir);
518  }
519  # - search $src_dir for files
520  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
521  my @files = readdir(DH);
522  closedir(DH);
523  foreach my $file (@files)
524  {
525    # - skip dot prefix files
526    if ($file !~ /^\./)
527    {
528      my $src_path = $src_dir . '/' . $file;
529      # - recurse directories, remembering to extend HDFS dir too
530      if (-d $src_path)
531      {
532        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
533        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
534      }
535      # - and use 'put' to copy files
536      else
537      {
538        my $hdfs_path = $hdfs_dir . '/' . $file;
539        if ($use_nfs)
540        {
541          &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
542        }
543        else
544        {
545          &hdfsCommand('put', $src_path, $hdfs_path);
546        }
547        $file_count++;
548      }
549    }
550  }
551  return $file_count;
552}
553## recursiveCopy() ##
554
555
556# /** @function shellCommand
557#  */
558sub shellCommand
559{
560  my $cmd = shift(@_);
561  my $output = '';
562  &debugPrint($cmd);
563  if (!$dry_run)
564  {
565    $output = `$cmd`;
566  }
567  return $output;
568}
569# /** shellCommand() **/
570
571# /** @function urlCat
572#  */
573sub urlCat
574{
575  my $url = join('/', @_);
576  return $url;
577}
578# /** urlCat() **/
579
580# /**
581#  */
582sub dirIsEmpty
583{
584  my $dir = shift(@_);
585  my @files;
586  if (-e $dir)
587  {
588    opendir(DIR, $dir) or die $!;
589    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
590    closedir(DIR);
591  }
592  @files ? 0 : 1;
593}
594# /** dirIsEmpty() **/
595
596
597## @function recursiveDelete()
598#
599sub recursiveDelete
600{
601  my ($dir, $prefix) = @_;
602  if ($dir =~ /^$prefix/)
603  {
604    &shellCommand('rm -rf "' . $dir . '"');
605  }
606}
607## recursiveDelete() ##
Note: See TracBrowser for help on using the browser.