root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27732

Revision 27732, 15.5 KB (checked in by jmt12, 6 years ago)

Nice the copy itself too

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
32my $hdfs_fs_prefix = 'HDThriftFS://';
33my $refresh_import = 0;
34my $username = `whoami`;
35chomp($username);
36
37`rocks > /dev/null 2>&1`;
38my $is_rocks_cluster = ($? == 0);
39
40# 1. Read and validate parameters
41print 'Options: ' . join(' ', @ARGV) . "\n";
42if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
43{
44  $collection = $ARGV[0];
45  print ' collection: ' . $collection . "\n";
46}
47else
48{
49  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
50  print STDERR "where: [debug] print more debug messages to STDERR\n";
51  print STDERR "       [dry_run] don't actually perform an file actions\n";
52  exit;
53}
54my $offset = 1;
55while (defined $ARGV[$offset])
56{
57  if ($ARGV[$offset] eq '-debug')
58  {
59    $debug = 1;
60  }
61  if ($ARGV[$offset] eq '-disable_thrift')
62  {
63    $use_thrift = 0;
64  }
65  if ($ARGV[$offset] eq '-dry_run')
66  {
67    $dry_run = 1;
68  }
69  if ($ARGV[$offset] eq '-refresh_import')
70  {
71    $refresh_import = 1;
72  }
73  if ($ARGV[$offset] eq '-stagger')
74  {
75    $stagger = 1;
76  }
77  if ($ARGV[$offset] eq '-flush_diskcache')
78  {
79    $flush_diskcache = 1;
80  }
81  if ($ARGV[$offset] eq '-start_thrift')
82  {
83    $start_thrift = 1;
84  }
85  if ($ARGV[$offset] eq '-use_nfs')
86  {
87    $use_nfs = 1;
88  }
89  $offset++;
90}
91
92if (!$use_thrift)
93{
94  $hdfs_fs_prefix = 'HDFSShell://';
95}
96if ($use_nfs)
97{
98  $hdfs_fs_prefix = '/hdfs';
99}
100
101my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
102my $gs_import_dir = $gs_collection_dir . '/import';
103if (!-d $gs_import_dir)
104{
105  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
106}
107my $gs_results_dir = $gs_collection_dir . '/results';
108if (!-d $gs_results_dir)
109{
110  mkdir($gs_results_dir, 0755);
111}
112$gs_results_dir .= '/' . time();
113if (!-d $gs_results_dir)
114{
115  mkdir($gs_results_dir, 0755);
116}
117# - directories within HDFS
118my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
119print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
120my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
121if ($use_nfs)
122{
123  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
124}
125my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
126print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
127my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
128if ($use_nfs)
129{
130  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
131}
132
133# 2. Copy the import directory into HDFS
134print " * Replicating import directory in HDFS...";
135# - check if import directory already exists
136my $hdfs_import_exists = 0;
137if ($use_nfs)
138{
139  if (-d $nfs_input_dir)
140  {
141    $hdfs_import_exists = 1;
142  }
143}
144else
145{
146  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
147}
148if ($refresh_import || !$hdfs_import_exists)
149{
150  # - clear out the old import directory
151  if ($hdfs_import_exists)
152  {
153    if ($use_nfs)
154    {
155      &recursiveDelete($nfs_input_dir, '/hdfs');
156    }
157    else
158    {
159      &hdfsCommand('rmr', $hdfs_input_dir);
160    }
161  }
162  # - now recursively copy the contents of import directory into HDFS ensuring
163  #   that relative paths are maintained
164  my $file_count = 0;
165  if ($use_nfs)
166  {
167    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
168  }
169  else
170  {
171    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
172  }
173  &debugPrint($file_count . " files 'putted'");
174  print "Done!\n";
175}
176else
177{
178  print "Already exists!\n";
179}
180
181# - clear out the archives regardless
182my $gs_archives_dir = $gs_collection_dir . '/archives';
183my $deleted_archives = 0;
184if (-e $gs_archives_dir)
185{
186  print " * Clearing existing archives directory for this collection... ";
187  &recursiveDelete($gs_archives_dir, $gsdl_home);
188  $deleted_archives = 1;
189}
190mkdir($gs_archives_dir, 0755);
191my $hdfs_archives_exists = 0;
192if ($use_nfs)
193{
194  if (-d $nfs_output_dir)
195  {
196    $hdfs_archives_exists = 1;
197  }
198}
199else
200{
201  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
202}
203if ($hdfs_archives_exists)
204{
205  if (!$deleted_archives)
206  {
207    print " * Clearing existing archives directory for this collection... ";
208  }
209  if ($use_nfs)
210  {
211    &recursiveDelete($nfs_output_dir, '/hdfs');
212  }
213  else
214  {
215    &hdfsCommand('rmr', $hdfs_output_dir);
216  }
217  $deleted_archives = 1;
218}
219if ($deleted_archives)
220{
221  print "Done!\n";
222}
223
224# - watch for cached directories for Media based collections
225my $gs_cached_dir = $gs_collection_dir . '/cached';
226if (-e $gs_cached_dir)
227{
228  print " * Clearing existing cached media directory for this collection... ";
229  &recursiveDelete($gs_cached_dir, $gsdl_home);
230  print "Done!\n";
231}
232
233# - clear out any old logs
234print " * Clearing existing logs for this collection... ";
235my $gs_logs_dir = $gs_collection_dir . '/logs';
236if (!&dirIsEmpty($gs_logs_dir))
237{
238  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
239}
240if (!&dirIsEmpty('/tmp/greenstone'))
241{
242  &shellCommand('rm -f /tmp/greenstone/*.*');
243  &shellCommand('rm -rf /tmp/gsimport*');
244  &shellCommand('rm -rf /tmp/thrift');
245}
246if ($is_rocks_cluster)
247{
248  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
249  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
250  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
251}
252print "Done!\n";
253
254# - flush DNS cache too, so we are playing on a level field
255if ($flush_diskcache)
256{
257  print " * Flushing disk cache... ";
258  &shellCommand('flush_caches.pl');
259  if ($is_rocks_cluster)
260  {
261    &shellCommand('rocks run host "flush_caches.pl"');
262  }
263  print "Done!\n";
264}
265
266# - If we've been asked to Stagger start-up, add "delay.me" files to the
267#   compute nodes
268if ($is_rocks_cluster && $stagger)
269{
270  &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
271}
272
273# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
274#    where we start the server now to ensure it lives on the head node
275my $server_host = '';
276my $server_port = '';
277my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
278my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
279my $server_prefix = '';
280if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
281{
282  $server_prefix = uc($1);
283  print " * Starting " . $server_prefix . "Server... ";
284  # - start the server on the head node and retrieve the host and port from
285  #   the output
286  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
287  my $launcher_output = &shellCommand($launcher_command);
288  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
289  {
290    $server_host = $1;
291    $server_port = $2;
292    print "running on " . $server_host . ":" . $server_port . "\n";
293  }
294  else
295  {
296    print "Failed!\n";
297    exit;
298  }
299  # - use the client tool to add ourselves as a listener
300  print " * Registering as listener... ";
301  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
302  &shellCommand($client_command);
303  print "Done!\n";
304}
305else
306{
307  print "Error! True Hadoop processing is only available when Greenstone is\n";
308  print "       configured to use either GDBMServer or TDBServer.\n";
309  exit;
310}
311
312# 3.5 Start up the thrift server(s) if we've been asked to
313my $thrift_log = $gs_results_dir . '/thriftctl.log';
314if ($start_thrift)
315{
316  if ($is_rocks_cluster)
317  {
318    print " * Starting Thrift Servers (on compute nodes)... ";
319    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
320  }
321  # single server
322  else
323  {
324    print " * Starting Thrift Server... ";
325    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
326  }
327  print "Done!\n";
328}
329
330my $actual_archives_dir;
331if ($use_nfs)
332{
333  $actual_archives_dir = $nfs_output_dir;
334}
335else
336{
337  $actual_archives_dir = $hdfs_output_dir;
338  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
339}
340
341# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
342#    and allow the FileInputFormat to split it up into files to be processed
343#    in Greenstone. This works for collections with one file per document, like
344#    Lorem and ReplayMe, but might not work well with multiple file documents
345#    such as the Demo collection
346print " * Running import using Hadoop...";
347my $hadoop_log = $gs_results_dir . '/hadoop.log';
348&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
349my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest ';
350$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
351$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
352$hadoop_command .= $collection . ' '; # The collection name
353$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
354$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
355$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
356$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
357$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
358&shellCommand($hadoop_command);
359print "Done!\n";
360
361# 5. If we ran *Server infodbs, we now need to shut them down
362if ($server_prefix ne '')
363{
364  print " * Deregistering as listener and shutting down... ";
365  # - deregister as a listener
366  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
367  &shellCommand($client_command1);
368  # - send quit command
369  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
370  &shellCommand($client_command2);
371  print "Done!\n";
372}
373
374# 5.5 We started them - so we better stop the thrift servers too
375# 3.5 Start up the thrift server(s) if we've been asked to
376if ($start_thrift)
377{
378  if ($is_rocks_cluster)
379  {
380    print " * Stopping Thrift Servers (on compute nodes)... ";
381    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
382  }
383  # single server
384  else
385  {
386    print " * Stoping Thrift Server... ";
387    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
388  }
389  print "Done!\n";
390}
391
392# 6. Gather logs
393print " * Gathering logs from compute nodes... ";
394# - local files
395if (!&dirIsEmpty('/tmp/greenstone'))
396{
397  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
398}
399if (-d $gs_collection_dir . '/logs')
400{
401  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
402}
403if ($start_thrift && -d '/tmp/thrift')
404{
405  &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
406}
407# - remote files
408if ($is_rocks_cluster)
409{
410  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
411  &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
412  if ($start_thrift)
413  {
414    &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
415  }
416}
417print "Done!\n";
418
419# - generate data locality report...
420if (!$use_nfs && !$use_thrift)
421{
422  &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
423}
424
425# - hadoop report...
426&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
427
428# - and gantt chart
429&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
430
431# 7. Done - clean up
432print " * Cleaning up temporary files... ";
433&shellCommand('rm -rf /tmp/greenstone');
434&shellCommand('rm -rf /tmp/gsimport*');
435if ($is_rocks_cluster)
436{
437  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
438  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
439}
440print "Done!\n";
441print "Complete!\n\n";
442
443exit;
444
445# /** @function debugPrint
446#  */
447sub debugPrint
448{
449  my $msg = shift(@_);
450  if ($debug)
451  {
452    print "[Debug] " . $msg . "\n";
453  }
454}
455# /** debugPrint() **/
456
457# /** @function hdfsCommand
458#  */
459sub hdfsCommand
460{
461  my $command = shift(@_);
462  my $paths = '"' . join('" "', @_) . '"';
463  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
464  &shellCommand($hdfs_command);
465  return $?;
466}
467# /** hdfsCommand() **/
468
469# /** @function hdfsTest
470#  */
471sub hdfsTest
472{
473  my $command = shift(@_);
474  my $test_target = shift(@_);
475  my $result = &hdfsCommand('test -' . $command, @_);
476  return ($result == $test_target);
477}
478# /** hdfsTest() **/
479
480# /**
481#  */
482sub printUsage
483{
484  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
485  exit;
486}
487# /** printUsage() **/
488
489
490## @function recursiveCopy()
491#
492sub recursiveCopy
493{
494  my ($src_dir, $hdfs_dir) = @_;
495  my $file_count = 0;
496  # - create the directory in HDFS
497  if ($use_nfs)
498  {
499    &shellCommand('mkdir "' . $hdfs_dir . '"');
500  }
501  else
502  {
503    &hdfsCommand('mkdir', $hdfs_dir);
504  }
505  # - search $src_dir for files
506  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
507  my @files = readdir(DH);
508  closedir(DH);
509  foreach my $file (@files)
510  {
511    # - skip dot prefix files
512    if ($file !~ /^\./)
513    {
514      my $src_path = $src_dir . '/' . $file;
515      # - recurse directories, remembering to extend HDFS dir too
516      if (-d $src_path)
517      {
518        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
519        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
520      }
521      # - and use 'put' to copy files
522      else
523      {
524        my $hdfs_path = $hdfs_dir . '/' . $file;
525        if ($use_nfs)
526        {
527          &shellCommand('nice -n 5 cp "' . $src_path . '" "' . $hdfs_path . '"');
528        }
529        else
530        {
531          &hdfsCommand('put', $src_path, $hdfs_path);
532        }
533        $file_count++;
534      }
535    }
536  }
537  return $file_count;
538}
539## recursiveCopy() ##
540
541
542# /** @function shellCommand
543#  */
544sub shellCommand
545{
546  my $cmd = shift(@_);
547  my $output = '';
548  &debugPrint($cmd);
549  if (!$dry_run)
550  {
551    $output = `$cmd`;
552  }
553  return $output;
554}
555# /** shellCommand() **/
556
557# /** @function urlCat
558#  */
559sub urlCat
560{
561  my $url = join('/', @_);
562  return $url;
563}
564# /** urlCat() **/
565
566# /**
567#  */
568sub dirIsEmpty
569{
570  my $dir = shift(@_);
571  my @files;
572  if (-e $dir)
573  {
574    opendir(DIR, $dir) or die $!;
575    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
576    closedir(DIR);
577  }
578  @files ? 0 : 1;
579}
580# /** dirIsEmpty() **/
581
582
583## @function recursiveDelete()
584#
585sub recursiveDelete
586{
587  my ($dir, $prefix) = @_;
588  if ($dir =~ /^$prefix/)
589  {
590    &shellCommand('rm -rf "' . $dir . '"');
591  }
592}
593## recursiveDelete() ##
Note: See TracBrowser for help on using the browser.