root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27913

Revision 27913, 15.7 KB (checked in by jmt12, 7 years ago)

Made the ingester to be used (version 1 without reduce phase, or version 2 that includes reduce) configurable

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $java_library = 'HadoopGreenstoneIngest2';
32my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
33my $hdfs_fs_prefix = 'HDThriftFS://';
34my $refresh_import = 0;
35my $username = `whoami`;
36chomp($username);
37
38`rocks > /dev/null 2>&1`;
39my $is_rocks_cluster = ($? == 0);
40
41# 1. Read and validate parameters
42print 'Options: ' . join(' ', @ARGV) . "\n";
43if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
44{
45  $collection = $ARGV[0];
46  print ' collection: ' . $collection . "\n";
47}
48else
49{
50  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
51  print STDERR "where: [debug] print more debug messages to STDERR\n";
52  print STDERR "       [dry_run] don't actually perform an file actions\n";
53  exit;
54}
55my $offset = 1;
56while (defined $ARGV[$offset])
57{
58  if ($ARGV[$offset] eq '-debug')
59  {
60    $debug = 1;
61  }
62  if ($ARGV[$offset] eq '-disable_thrift')
63  {
64    $use_thrift = 0;
65  }
66  if ($ARGV[$offset] eq '-dry_run')
67  {
68    $dry_run = 1;
69  }
70  if ($ARGV[$offset] eq '-refresh_import')
71  {
72    $refresh_import = 1;
73  }
74  if ($ARGV[$offset] eq '-stagger')
75  {
76    $stagger = 1;
77  }
78  if ($ARGV[$offset] eq '-flush_diskcache')
79  {
80    $flush_diskcache = 1;
81  }
82  if ($ARGV[$offset] eq '-start_thrift')
83  {
84    $start_thrift = 1;
85  }
86  if ($ARGV[$offset] eq '-use_nfs')
87  {
88    $use_nfs = 1;
89  }
90  $offset++;
91}
92
93if (!$use_thrift)
94{
95  $hdfs_fs_prefix = 'HDFSShell://';
96}
97if ($use_nfs)
98{
99  $hdfs_fs_prefix = '/hdfs';
100}
101
102my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
103my $gs_import_dir = $gs_collection_dir . '/import';
104if (!-d $gs_import_dir)
105{
106  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
107}
108my $gs_results_dir = $gs_collection_dir . '/results';
109if (!-d $gs_results_dir)
110{
111  mkdir($gs_results_dir, 0755);
112}
113$gs_results_dir .= '/' . time();
114if (!-d $gs_results_dir)
115{
116  mkdir($gs_results_dir, 0755);
117}
118# - directories within HDFS
119my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
120print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
121my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
122if ($use_nfs)
123{
124  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
125}
126my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
127print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
128my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
129if ($use_nfs)
130{
131  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
132}
133
134# 2. Copy the import directory into HDFS
135print " * Replicating import directory in HDFS...";
136# - check if import directory already exists
137my $hdfs_import_exists = 0;
138if ($use_nfs)
139{
140  if (-d $nfs_input_dir)
141  {
142    $hdfs_import_exists = 1;
143  }
144}
145else
146{
147  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
148}
149if ($refresh_import || !$hdfs_import_exists)
150{
151  # - clear out the old import directory
152  if ($hdfs_import_exists)
153  {
154    if ($use_nfs)
155    {
156      &recursiveDelete($nfs_input_dir, '/hdfs');
157    }
158    else
159    {
160      &hdfsCommand('rmr', $hdfs_input_dir);
161    }
162  }
163  # - now recursively copy the contents of import directory into HDFS ensuring
164  #   that relative paths are maintained
165  my $file_count = 0;
166  if ($use_nfs)
167  {
168    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
169  }
170  else
171  {
172    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
173  }
174  &debugPrint($file_count . " files 'putted'");
175  print "Done!\n";
176}
177else
178{
179  print "Already exists!\n";
180}
181
182# - clear out the archives regardless
183my $gs_archives_dir = $gs_collection_dir . '/archives';
184my $deleted_archives = 0;
185if (-e $gs_archives_dir)
186{
187  print " * Clearing existing archives directory for this collection... ";
188  &recursiveDelete($gs_archives_dir, $gsdl_home);
189  $deleted_archives = 1;
190}
191mkdir($gs_archives_dir, 0755);
192my $hdfs_archives_exists = 0;
193if ($use_nfs)
194{
195  if (-d $nfs_output_dir)
196  {
197    $hdfs_archives_exists = 1;
198  }
199}
200else
201{
202  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
203}
204if ($hdfs_archives_exists)
205{
206  if (!$deleted_archives)
207  {
208    print " * Clearing existing archives directory for this collection... ";
209  }
210  if ($use_nfs)
211  {
212    &recursiveDelete($nfs_output_dir, '/hdfs');
213  }
214  else
215  {
216    &hdfsCommand('rmr', $hdfs_output_dir);
217  }
218  $deleted_archives = 1;
219}
220if ($deleted_archives)
221{
222  print "Done!\n";
223}
224
225# - watch for cached directories for Media based collections
226my $gs_cached_dir = $gs_collection_dir . '/cached';
227if (-e $gs_cached_dir)
228{
229  print " * Clearing existing cached media directory for this collection... ";
230  &recursiveDelete($gs_cached_dir, $gsdl_home);
231  print "Done!\n";
232}
233
234# - clear out any old logs
235print " * Clearing existing logs for this collection... ";
236my $gs_logs_dir = $gs_collection_dir . '/logs';
237if (!&dirIsEmpty($gs_logs_dir))
238{
239  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
240}
241if (!&dirIsEmpty('/tmp/greenstone'))
242{
243  &shellCommand('rm -f /tmp/greenstone/*.*');
244  &shellCommand('rm -rf /tmp/gsimport*');
245  &shellCommand('rm -rf /tmp/thrift');
246}
247if ($is_rocks_cluster)
248{
249  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
250  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
251  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
252}
253print "Done!\n";
254
255# - flush DNS cache too, so we are playing on a level field
256if ($flush_diskcache)
257{
258  print " * Flushing disk cache... ";
259  &shellCommand('flush_caches.pl');
260  if ($is_rocks_cluster)
261  {
262    &shellCommand('rocks run host "flush_caches.pl"');
263  }
264  print "Done!\n";
265}
266
267# - If we've been asked to Stagger start-up, add "delay.me" files to the
268#   compute nodes
269if ($is_rocks_cluster && $stagger)
270{
271  &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
272}
273
274# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
275#    where we start the server now to ensure it lives on the head node
276my $server_host = '';
277my $server_port = '';
278my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
279my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
280my $server_prefix = '';
281if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
282{
283  $server_prefix = uc($1);
284  print " * Starting " . $server_prefix . "Server... ";
285  # - start the server on the head node and retrieve the host and port from
286  #   the output
287  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
288  my $launcher_output = &shellCommand($launcher_command);
289  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
290  {
291    $server_host = $1;
292    $server_port = $2;
293    print "running on " . $server_host . ":" . $server_port . "\n";
294  }
295  else
296  {
297    print "Failed!\n";
298    exit;
299  }
300  # - use the client tool to add ourselves as a listener
301  print " * Registering as listener... ";
302  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
303  &shellCommand($client_command);
304  print "Done!\n";
305}
306elsif ($infodbtype =~ /stdoutxml/)
307{
308  print " * InfoDB messages will be written to STDOUT... Cool bananas!\n";
309}
310else
311{
312  print "Error! True Hadoop processing is only available when Greenstone is\n";
313  print "       configured to use either GDBMServer or TDBServer.\n";
314  exit;
315}
316
317# 3.5 Start up the thrift server(s) if we've been asked to
318my $thrift_log = $gs_results_dir . '/thriftctl.log';
319if ($start_thrift)
320{
321  if ($is_rocks_cluster)
322  {
323    print " * Starting Thrift Servers (on compute nodes)... ";
324    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
325  }
326  # single server
327  else
328  {
329    print " * Starting Thrift Server... ";
330    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
331  }
332  print "Done!\n";
333}
334
335my $actual_archives_dir;
336if ($use_nfs)
337{
338  $actual_archives_dir = $nfs_output_dir;
339}
340else
341{
342  $actual_archives_dir = $hdfs_output_dir;
343  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
344}
345
346# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
347#    and allow the FileInputFormat to split it up into files to be processed
348#    in Greenstone. This works for collections with one file per document, like
349#    Lorem and ReplayMe, but might not work well with multiple file documents
350#    such as the Demo collection
351print " * Running import using Hadoop...";
352my $hadoop_log = $gs_results_dir . '/hadoop.log';
353&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
354my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.' . $java_library . ' ';
355$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
356$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
357$hadoop_command .= $collection . ' '; # The collection name
358$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
359$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
360$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
361$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
362$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
363&shellCommand($hadoop_command);
364print "Done!\n";
365
366# 5. If we ran *Server infodbs, we now need to shut them down
367if ($server_prefix ne '')
368{
369  print " * Deregistering as listener and shutting down... ";
370  # - deregister as a listener
371  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
372  &shellCommand($client_command1);
373  # - send quit command
374  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
375  &shellCommand($client_command2);
376  print "Done!\n";
377}
378
379# 5.5 We started them - so we better stop the thrift servers too
380# 3.5 Start up the thrift server(s) if we've been asked to
381if ($start_thrift)
382{
383  if ($is_rocks_cluster)
384  {
385    print " * Stopping Thrift Servers (on compute nodes)... ";
386    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
387  }
388  # single server
389  else
390  {
391    print " * Stoping Thrift Server... ";
392    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
393  }
394  print "Done!\n";
395}
396
397# 6. Gather logs
398print " * Gathering logs from compute nodes... ";
399# - local files
400if (!&dirIsEmpty('/tmp/greenstone'))
401{
402  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
403}
404if (-d $gs_collection_dir . '/logs')
405{
406  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
407}
408if ($start_thrift && -d '/tmp/thrift')
409{
410  &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
411}
412# - remote files
413if ($is_rocks_cluster)
414{
415  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
416  &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
417  if ($start_thrift)
418  {
419    &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
420  }
421}
422print "Done!\n";
423
424# - generate data locality report...
425if (!$use_nfs && !$use_thrift)
426{
427  &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
428}
429
430# - hadoop report...
431&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
432
433# - and gantt chart
434&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
435
436# 7. Done - clean up
437print " * Cleaning up temporary files... ";
438&shellCommand('rm -rf /tmp/greenstone');
439&shellCommand('rm -rf /tmp/gsimport*');
440if ($is_rocks_cluster)
441{
442  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
443  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
444}
445print "Done!\n";
446print "Complete!\n\n";
447
448exit;
449
450# /** @function debugPrint
451#  */
452sub debugPrint
453{
454  my $msg = shift(@_);
455  if ($debug)
456  {
457    print "[Debug] " . $msg . "\n";
458  }
459}
460# /** debugPrint() **/
461
462# /** @function hdfsCommand
463#  */
464sub hdfsCommand
465{
466  my $command = shift(@_);
467  my $paths = '"' . join('" "', @_) . '"';
468  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
469  &shellCommand($hdfs_command);
470  return $?;
471}
472# /** hdfsCommand() **/
473
474# /** @function hdfsTest
475#  */
476sub hdfsTest
477{
478  my $command = shift(@_);
479  my $test_target = shift(@_);
480  my $result = &hdfsCommand('test -' . $command, @_);
481  return ($result == $test_target);
482}
483# /** hdfsTest() **/
484
485# /**
486#  */
487sub printUsage
488{
489  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
490  exit;
491}
492# /** printUsage() **/
493
494
495## @function recursiveCopy()
496#
497sub recursiveCopy
498{
499  my ($src_dir, $hdfs_dir) = @_;
500  my $file_count = 0;
501  # - create the directory in HDFS
502  if ($use_nfs)
503  {
504    &shellCommand('mkdir "' . $hdfs_dir . '"');
505  }
506  else
507  {
508    &hdfsCommand('mkdir', $hdfs_dir);
509  }
510  # - search $src_dir for files
511  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
512  my @files = readdir(DH);
513  closedir(DH);
514  foreach my $file (@files)
515  {
516    # - skip dot prefix files
517    if ($file !~ /^\./)
518    {
519      my $src_path = $src_dir . '/' . $file;
520      # - recurse directories, remembering to extend HDFS dir too
521      if (-d $src_path)
522      {
523        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
524        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
525      }
526      # - and use 'put' to copy files
527      else
528      {
529        my $hdfs_path = $hdfs_dir . '/' . $file;
530        if ($use_nfs)
531        {
532          &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
533        }
534        else
535        {
536          &hdfsCommand('put', $src_path, $hdfs_path);
537        }
538        $file_count++;
539      }
540    }
541  }
542  return $file_count;
543}
544## recursiveCopy() ##
545
546
547# /** @function shellCommand
548#  */
549sub shellCommand
550{
551  my $cmd = shift(@_);
552  my $output = '';
553  &debugPrint($cmd);
554  if (!$dry_run)
555  {
556    $output = `$cmd`;
557  }
558  return $output;
559}
560# /** shellCommand() **/
561
562# /** @function urlCat
563#  */
564sub urlCat
565{
566  my $url = join('/', @_);
567  return $url;
568}
569# /** urlCat() **/
570
571# /**
572#  */
573sub dirIsEmpty
574{
575  my $dir = shift(@_);
576  my @files;
577  if (-e $dir)
578  {
579    opendir(DIR, $dir) or die $!;
580    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
581    closedir(DIR);
582  }
583  @files ? 0 : 1;
584}
585# /** dirIsEmpty() **/
586
587
588## @function recursiveDelete()
589#
590sub recursiveDelete
591{
592  my ($dir, $prefix) = @_;
593  if ($dir =~ /^$prefix/)
594  {
595    &shellCommand('rm -rf "' . $dir . '"');
596  }
597}
598## recursiveDelete() ##
Note: See TracBrowser for help on using the browser.