root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 30354

Revision 30354, 16.0 KB (checked in by jmt12, 5 years ago)

Extending manifest v2 support to allow for directories to be listed in manifest. Matched with changes in Directory plugin to allow paths into systems like HDFS to be listed in manifest.cd

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 0;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $java_library = 'HadoopGreenstoneIngest2';
32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
33my $hdfs_fs_prefix = 'HDThriftFS://';
34my $refresh_import = 0;
35my $remove_old = 0;
36my $username = `whoami`;
37chomp($username);
38my $gs_results_dir = '';
39
40`rocks > /dev/null 2>&1`;
41my $is_rocks_cluster = ($? == 0);
42
43# 1. Read and validate parameters
44print 'Options: ' . join(' ', @ARGV) . "\n";
45if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
46{
47  $collection = $ARGV[0];
48  print ' collection: ' . $collection . "\n";
49}
50else
51{
52  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-enable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger] [-removeold]\n";
53  print STDERR "where: [debug] print more debug messages to STDERR\n";
54  print STDERR "       [dry_run] don't actually perform an file actions\n";
55  exit;
56}
57my $offset = 1;
58while (defined $ARGV[$offset])
59{
60  if ($ARGV[$offset] eq '-debug')
61  {
62    $debug = 1;
63  }
64  if ($ARGV[$offset] eq '-enable_thrift')
65  {
66    $use_thrift = 1;
67  }
68  if ($ARGV[$offset] eq '-dry_run')
69  {
70    $dry_run = 1;
71  }
72  if ($ARGV[$offset] eq '-refresh_import')
73  {
74    $refresh_import = 1;
75  }
76  if ($ARGV[$offset] eq '-stagger')
77  {
78    $stagger = 1;
79  }
80  if ($ARGV[$offset] eq '-flush_diskcache')
81  {
82    $flush_diskcache = 1;
83  }
84  if ($ARGV[$offset] eq '-start_thrift')
85  {
86    $start_thrift = 1;
87  }
88  if ($ARGV[$offset] eq '-use_nfs')
89  {
90    $use_nfs = 1;
91  }
92  if ($ARGV[$offset] eq '-removeold')
93  {
94    $remove_old = 1;
95  }
96  if ($ARGV[$offset] eq '-logdir')
97  {
98    $offset++;
99    $gs_results_dir = $ARGV[$offset];
100  }
101  $offset++;
102}
103
104if (!$use_thrift)
105{
106  $hdfs_fs_prefix = 'HDFSShell://';
107}
108if ($use_nfs)
109{
110  $hdfs_fs_prefix = '/hdfs';
111}
112
113my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
114my $gs_import_dir = $gs_collection_dir . '/import';
115if (!-d $gs_import_dir)
116{
117  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
118}
119if ($gs_results_dir eq '')
120{
121  $gs_results_dir = $gs_collection_dir . '/results';
122  if (!-d $gs_results_dir)
123  {
124    mkdir($gs_results_dir, 0755);
125  }
126  $gs_results_dir .= '/' . time();
127}
128if (!-d $gs_results_dir)
129{
130  mkdir($gs_results_dir, 0755);
131}
132# - directories within HDFS
133my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
134print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
135my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
136if ($use_nfs)
137{
138  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
139}
140my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
141print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
142my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
143if ($use_nfs)
144{
145  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
146}
147
148# 2. Copy the import directory into HDFS
149print " * Replicating import directory in HDFS...";
150# - check if import directory already exists
151my $hdfs_import_exists = 0;
152if ($use_nfs)
153{
154  if (-d $nfs_input_dir)
155  {
156    $hdfs_import_exists = 1;
157  }
158}
159else
160{
161  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
162}
163if ($refresh_import || !$hdfs_import_exists)
164{
165  # - clear out the old import directory
166  if ($hdfs_import_exists)
167  {
168    if ($use_nfs)
169    {
170      &recursiveDelete($nfs_input_dir, '/hdfs');
171    }
172    else
173    {
174      &hdfsCommand('rmr', $hdfs_input_dir);
175    }
176  }
177  # - now recursively copy the contents of import directory into HDFS ensuring
178  #   that relative paths are maintained
179  my $file_count = 0;
180  if ($use_nfs)
181  {
182    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
183  }
184  else
185  {
186    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
187  }
188  &debugPrint($file_count . " files 'putted'");
189  print "Done!\n";
190}
191else
192{
193  print "Already exists!\n";
194}
195
196# - clear out the archives regardless
197my $gs_archives_dir = $gs_collection_dir . '/archives';
198my $deleted_archives = 0;
199if (-e $gs_archives_dir)
200{
201  print " * Clearing existing archives directory for this collection... ";
202  &recursiveDelete($gs_archives_dir, $gsdl_home);
203  $deleted_archives = 1;
204}
205mkdir($gs_archives_dir, 0755);
206my $hdfs_archives_exists = 0;
207if ($use_nfs)
208{
209  if (-d $nfs_output_dir)
210  {
211    $hdfs_archives_exists = 1;
212  }
213}
214else
215{
216  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
217}
218if ($hdfs_archives_exists)
219{
220  if (!$deleted_archives)
221  {
222    print " * Clearing existing archives directory for this collection... ";
223  }
224  if ($use_nfs)
225  {
226    &recursiveDelete($nfs_output_dir, '/hdfs');
227  }
228  else
229  {
230    &hdfsCommand('rmr', $hdfs_output_dir);
231  }
232  $deleted_archives = 1;
233}
234if ($deleted_archives)
235{
236  print "Done!\n";
237}
238
239# - watch for cached directories for Media based collections
240my $gs_cached_dir = $gs_collection_dir . '/cached';
241if (-e $gs_cached_dir)
242{
243  print " * Clearing existing cached media directory for this collection... ";
244  &recursiveDelete($gs_cached_dir, $gsdl_home);
245  print "Done!\n";
246}
247
248# - clear out any old logs
249print " * Clearing existing logs for this collection... ";
250my $gs_logs_dir = $gs_collection_dir . '/logs';
251if (!&dirIsEmpty($gs_logs_dir))
252{
253  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
254}
255if (!&dirIsEmpty('/tmp/greenstone'))
256{
257  &shellCommand('rm -f /tmp/greenstone/*.*');
258  &shellCommand('rm -rf /tmp/gsimport*');
259  &shellCommand('rm -rf /tmp/thrift');
260}
261if ($is_rocks_cluster)
262{
263  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
264  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
265  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
266}
267print "Done!\n";
268
269# - flush DNS cache too, so we are playing on a level field
270if ($flush_diskcache)
271{
272  print " * Flushing disk cache... ";
273  &shellCommand('flush_caches.pl');
274  if ($is_rocks_cluster)
275  {
276    &shellCommand('rocks run host "flush_caches.pl"');
277  }
278  print "Done!\n";
279}
280
281# - If we've been asked to Stagger start-up, add "delay.me" files to the
282#   compute nodes
283if ($is_rocks_cluster && $stagger)
284{
285  &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
286}
287
288# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
289#    where we start the server now to ensure it lives on the head node
290my $server_host = '';
291my $server_port = '';
292my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
293my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
294my $server_prefix = '';
295if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
296{
297  $server_prefix = uc($1);
298  print " * Starting " . $server_prefix . "Server... ";
299  # - start the server on the head node and retrieve the host and port from
300  #   the output
301  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
302  my $launcher_output = &shellCommand($launcher_command);
303  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
304  {
305    $server_host = $1;
306    $server_port = $2;
307    print "running on " . $server_host . ":" . $server_port . "\n";
308  }
309  else
310  {
311    print "Failed!\n";
312    exit;
313  }
314  # - use the client tool to add ourselves as a listener
315  print " * Registering as listener... ";
316  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
317  &shellCommand($client_command);
318  print "Done!\n";
319}
320elsif ($infodbtype =~ /stdoutxml/)
321{
322  print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
323}
324else
325{
326  print "Error! True Hadoop processing is only available when Greenstone is\n";
327  print "       configured to use either GDBMServer or TDBServer.\n";
328  exit;
329}
330
331# 3.5 Start up the thrift server(s) if we've been asked to
332my $thrift_log = $gs_results_dir . '/thriftctl.log';
333if ($start_thrift)
334{
335  if ($is_rocks_cluster)
336  {
337    print " * Starting Thrift Servers (on compute nodes)... ";
338    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
339  }
340  # single server
341  else
342  {
343    print " * Starting Thrift Server... ";
344    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
345  }
346  print "Done!\n";
347}
348
349my $actual_archives_dir;
350if ($use_nfs)
351{
352  $actual_archives_dir = $nfs_output_dir;
353}
354else
355{
356  $actual_archives_dir = $hdfs_output_dir;
357  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
358}
359
360# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
361#    and allow the FileInputFormat to split it up into files to be processed
362#    in Greenstone. This works for collections with one file per document, like
363#    Lorem and ReplayMe, but might not work well with multiple file documents
364#    such as the Demo collection
365print " * Running import using Hadoop...";
366my $hadoop_log = $gs_results_dir . '/hadoop.log';
367&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
368my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
369$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
370$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
371$hadoop_command .= $collection . ' '; # The collection name
372$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
373$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
374$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
375$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
376$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
377&shellCommand($hadoop_command);
378print "Done!\n";
379
380# 5. If we ran *Server infodbs, we now need to shut them down
381if ($server_prefix ne '')
382{
383  print " * Deregistering as listener and shutting down... ";
384  # - deregister as a listener
385  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
386  &shellCommand($client_command1);
387  # - send quit command
388  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
389  &shellCommand($client_command2);
390  print "Done!\n";
391}
392
393# 5.5 We started them - so we better stop the thrift servers too
394# 3.5 Start up the thrift server(s) if we've been asked to
395if ($start_thrift)
396{
397  if ($is_rocks_cluster)
398  {
399    print " * Stopping Thrift Servers (on compute nodes)... ";
400    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
401  }
402  # single server
403  else
404  {
405    print " * Stoping Thrift Server... ";
406    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
407  }
408  print "Done!\n";
409}
410
411# 6. Gather logs
412print " * Gathering logs from compute nodes... ";
413# - local files
414if (!&dirIsEmpty('/tmp/greenstone'))
415{
416  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
417}
418if (-d $gs_collection_dir . '/logs' && !&dirIsEmpty($gs_collection_dir . '/logs'))
419{
420  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
421}
422if ($start_thrift && -d '/tmp/thrift')
423{
424  &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
425}
426# - remote files
427if ($is_rocks_cluster)
428{
429  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
430  &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
431  if ($start_thrift)
432  {
433    &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
434  }
435}
436print "Done!\n";
437
438# - generate data locality report...
439if (!$use_nfs && !$use_thrift)
440{
441  &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
442}
443
444# - hadoop report...
445&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
446
447# - and gantt chart
448&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
449
450# 7. Done - clean up
451print " * Cleaning up temporary files... ";
452&shellCommand('rm -rf /tmp/greenstone');
453&shellCommand('rm -rf /tmp/gsimport*');
454if ($is_rocks_cluster)
455{
456  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
457  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
458}
459print "Done!\n";
460print "Complete!\n\n";
461
462exit;
463
464# /** @function debugPrint
465#  */
466sub debugPrint
467{
468  my $msg = shift(@_);
469  if ($debug)
470  {
471    print "[Debug] " . $msg . "\n";
472  }
473}
474# /** debugPrint() **/
475
476# /** @function hdfsCommand
477#  */
478sub hdfsCommand
479{
480  my $command = shift(@_);
481  my $paths = '"' . join('" "', @_) . '"';
482  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
483  &shellCommand($hdfs_command);
484  return $?;
485}
486# /** hdfsCommand() **/
487
488# /** @function hdfsTest
489#  */
490sub hdfsTest
491{
492  my $command = shift(@_);
493  my $test_target = shift(@_);
494  my $result = &hdfsCommand('test -' . $command, @_);
495  return ($result == $test_target);
496}
497# /** hdfsTest() **/
498
499# /**
500#  */
501sub printUsage
502{
503  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
504  exit;
505}
506# /** printUsage() **/
507
508
509## @function recursiveCopy()
510#
511sub recursiveCopy
512{
513  my ($src_dir, $hdfs_dir) = @_;
514  my $file_count = 0;
515  # - create the directory in HDFS
516  if ($use_nfs)
517  {
518    &shellCommand('mkdir "' . $hdfs_dir . '"');
519  }
520  else
521  {
522    &hdfsCommand('mkdir', $hdfs_dir);
523  }
524  # - search $src_dir for files
525  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
526  my @files = readdir(DH);
527  closedir(DH);
528  foreach my $file (@files)
529  {
530    # - skip dot prefix files
531    if ($file !~ /^\./)
532    {
533      my $src_path = $src_dir . '/' . $file;
534      # - recurse directories, remembering to extend HDFS dir too
535      if (-d $src_path)
536      {
537        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
538        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
539      }
540      # - and use 'put' to copy files
541      else
542      {
543        my $hdfs_path = $hdfs_dir . '/' . $file;
544        if ($use_nfs)
545        {
546          &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
547        }
548        else
549        {
550          &hdfsCommand('put', $src_path, $hdfs_path);
551        }
552        $file_count++;
553      }
554    }
555  }
556  return $file_count;
557}
558## recursiveCopy() ##
559
560
561# /** @function shellCommand
562#  */
563sub shellCommand
564{
565  my $cmd = shift(@_);
566  my $output = '';
567  &debugPrint($cmd);
568  if (!$dry_run)
569  {
570    $output = `$cmd`;
571  }
572  return $output;
573}
574# /** shellCommand() **/
575
576# /** @function urlCat
577#  */
578sub urlCat
579{
580  my $url = join('/', @_);
581  return $url;
582}
583# /** urlCat() **/
584
585# /**
586#  */
587sub dirIsEmpty
588{
589  my $dir = shift(@_);
590  my @files;
591  if (-e $dir)
592  {
593    opendir(DIR, $dir) or die $!;
594    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
595    closedir(DIR);
596  }
597  @files ? 0 : 1;
598}
599# /** dirIsEmpty() **/
600
601
602## @function recursiveDelete()
603#
604sub recursiveDelete
605{
606  my ($dir, $prefix) = @_;
607  if ($dir =~ /^$prefix/)
608  {
609    &shellCommand('rm -rf "' . $dir . '"');
610  }
611}
612## recursiveDelete() ##
Note: See TracBrowser for help on using the browser.