root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27654

Revision 27654, 15.4 KB (checked in by jmt12, 6 years ago)

Add the ability to stagger the starting of Mappers by placing a 'delay.me' file in the tmp directory of the compute node to delay. They will then be initially delayed by compute node number * 100 seconds

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $stagger = 0;
25my $flush_diskcache = 0;
26my $use_nfs = 0;
27
28my $gsdl_home = $ENV{'GSDLHOME'};
29my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
30my $hadoop_exe = 'hadoop'; # you may add path
31my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
32my $hdfs_fs_prefix = 'HDThriftFS://';
33my $refresh_import = 0;
34my $username = `whoami`;
35chomp($username);
36
37`rocks > /dev/null 2>&1`;
38my $is_rocks_cluster = ($? == 0);
39
40# 1. Read and validate parameters
41if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
42{
43  $collection = $ARGV[0];
44}
45else
46{
47  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-disable_thrift] [-dry_run] [-start_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs] [-stagger]\n";
48  print STDERR "where: [debug] print more debug messages to STDERR\n";
49  print STDERR "       [dry_run] don't actually perform an file actions\n";
50  exit;
51}
52my $offset = 1;
53while (defined $ARGV[$offset])
54{
55  if ($ARGV[$offset] eq '-debug')
56  {
57    $debug = 1;
58  }
59  if ($ARGV[$offset] eq '-disable_thrift')
60  {
61    $use_thrift = 0;
62  }
63  if ($ARGV[$offset] eq '-dry_run')
64  {
65    $dry_run = 1;
66  }
67  if ($ARGV[$offset] eq '-refresh_import')
68  {
69    $refresh_import = 1;
70  }
71  if ($ARGV[$offset] eq '-stagger')
72  {
73    $stagger = 1;
74  }
75  if ($ARGV[$offset] eq '-flush_diskcache')
76  {
77    $flush_diskcache = 1;
78  }
79  if ($ARGV[$offset] eq '-start_thrift')
80  {
81    $start_thrift = 1;
82  }
83  if ($ARGV[$offset] eq '-use_nfs')
84  {
85    $use_nfs = 1;
86  }
87  $offset++;
88}
89
90if (!$use_thrift)
91{
92  $hdfs_fs_prefix = 'HDFSShell://';
93}
94if ($use_nfs)
95{
96  $hdfs_fs_prefix = '/hdfs';
97}
98
99my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
100my $gs_import_dir = $gs_collection_dir . '/import';
101if (!-d $gs_import_dir)
102{
103  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
104}
105my $gs_results_dir = $gs_collection_dir . '/results';
106if (!-d $gs_results_dir)
107{
108  mkdir($gs_results_dir, 0755);
109}
110$gs_results_dir .= '/' . time();
111if (!-d $gs_results_dir)
112{
113  mkdir($gs_results_dir, 0755);
114}
115# - directories within HDFS
116my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
117print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
118my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
119if ($use_nfs)
120{
121  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
122}
123my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
124print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
125my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
126if ($use_nfs)
127{
128  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
129}
130
131# 2. Copy the import directory into HDFS
132print " * Replicating import directory in HDFS...";
133# - check if import directory already exists
134my $hdfs_import_exists = 0;
135if ($use_nfs)
136{
137  if (-d $nfs_input_dir)
138  {
139    $hdfs_import_exists = 1;
140  }
141}
142else
143{
144  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
145}
146if ($refresh_import || !$hdfs_import_exists)
147{
148  # - clear out the old import directory
149  if ($hdfs_import_exists)
150  {
151    if ($use_nfs)
152    {
153      &recursiveDelete($nfs_input_dir, '/hdfs');
154    }
155    else
156    {
157      &hdfsCommand('rmr', $hdfs_input_dir);
158    }
159  }
160  # - now recursively copy the contents of import directory into HDFS ensuring
161  #   that relative paths are maintained
162  my $file_count = 0;
163  if ($use_nfs)
164  {
165    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
166  }
167  else
168  {
169    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
170  }
171  &debugPrint($file_count . " files 'putted'");
172  print "Done!\n";
173}
174else
175{
176  print "Already exists!\n";
177}
178
179# - clear out the archives regardless
180my $gs_archives_dir = $gs_collection_dir . '/archives';
181my $deleted_archives = 0;
182if (-e $gs_archives_dir)
183{
184  print " * Clearing existing archives directory for this collection... ";
185  &recursiveDelete($gs_archives_dir, $gsdl_home);
186  $deleted_archives = 1;
187}
188mkdir($gs_archives_dir, 0755);
189my $hdfs_archives_exists = 0;
190if ($use_nfs)
191{
192  if (-d $nfs_output_dir)
193  {
194    $hdfs_archives_exists = 1;
195  }
196}
197else
198{
199  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
200}
201if ($hdfs_archives_exists)
202{
203  if (!$deleted_archives)
204  {
205    print " * Clearing existing archives directory for this collection... ";
206  }
207  if ($use_nfs)
208  {
209    &recursiveDelete($nfs_output_dir, '/hdfs');
210  }
211  else
212  {
213    &hdfsCommand('rmr', $hdfs_output_dir);
214  }
215  $deleted_archives = 1;
216}
217if ($deleted_archives)
218{
219  print "Done!\n";
220}
221
222# - watch for cached directories for Media based collections
223my $gs_cached_dir = $gs_collection_dir . '/cached';
224if (-e $gs_cached_dir)
225{
226  print " * Clearing existing cached media directory for this collection... ";
227  &recursiveDelete($gs_cached_dir, $gsdl_home);
228  print "Done!\n";
229}
230
231# - clear out any old logs
232print " * Clearing existing logs for this collection... ";
233my $gs_logs_dir = $gs_collection_dir . '/logs';
234if (!&dirIsEmpty($gs_logs_dir))
235{
236  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
237}
238if (!&dirIsEmpty('/tmp/greenstone'))
239{
240  &shellCommand('rm -f /tmp/greenstone/*.*');
241  &shellCommand('rm -rf /tmp/gsimport*');
242  &shellCommand('rm -rf /tmp/thrift');
243}
244if ($is_rocks_cluster)
245{
246  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
247  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
248  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
249}
250print "Done!\n";
251
252# - flush DNS cache too, so we are playing on a level field
253if ($flush_diskcache)
254{
255  print " * Flushing disk cache... ";
256  &shellCommand('flush_caches.pl');
257  if ($is_rocks_cluster)
258  {
259    &shellCommand('rocks run host "flush_caches.pl"');
260  }
261  print "Done!\n";
262}
263
264# - If we've been asked to Stagger start-up, add "delay.me" files to the
265#   compute nodes
266if ($is_rocks_cluster && $stagger)
267{
268  &shellCommand('rocks run host "touch /tmp/greenstone/delay.me && chmod a+rw /tmp/greenstone/delay.me"');
269}
270
271# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
272#    where we start the server now to ensure it lives on the head node
273my $server_host = '';
274my $server_port = '';
275my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
276my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
277my $server_prefix = '';
278if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
279{
280  $server_prefix = uc($1);
281  print " * Starting " . $server_prefix . "Server... ";
282  # - start the server on the head node and retrieve the host and port from
283  #   the output
284  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
285  my $launcher_output = &shellCommand($launcher_command);
286  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
287  {
288    $server_host = $1;
289    $server_port = $2;
290    print "running on " . $server_host . ":" . $server_port . "\n";
291  }
292  else
293  {
294    print "Failed!\n";
295    exit;
296  }
297  # - use the client tool to add ourselves as a listener
298  print " * Registering as listener... ";
299  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
300  &shellCommand($client_command);
301  print "Done!\n";
302}
303else
304{
305  print "Error! True Hadoop processing is only available when Greenstone is\n";
306  print "       configured to use either GDBMServer or TDBServer.\n";
307  exit;
308}
309
310# 3.5 Start up the thrift server(s) if we've been asked to
311my $thrift_log = $gs_results_dir . '/thriftctl.log';
312if ($start_thrift)
313{
314  if ($is_rocks_cluster)
315  {
316    print " * Starting Thrift Servers (on compute nodes)... ";
317    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start" > "' . $thrift_log . '" 2>&1');
318  }
319  # single server
320  else
321  {
322    print " * Starting Thrift Server... ";
323    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start > "' . $thrift_log . '" 2>&1');
324  }
325  print "Done!\n";
326}
327
328my $actual_archives_dir;
329if ($use_nfs)
330{
331  $actual_archives_dir = $nfs_output_dir;
332}
333else
334{
335  $actual_archives_dir = $hdfs_output_dir;
336  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
337}
338
339# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
340#    and allow the FileInputFormat to split it up into files to be processed
341#    in Greenstone. This works for collections with one file per document, like
342#    Lorem and ReplayMe, but might not work well with multiple file documents
343#    such as the Demo collection
344print " * Running import using Hadoop...";
345my $hadoop_log = $gs_results_dir . '/hadoop.log';
346&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
347my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest ';
348$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
349$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
350$hadoop_command .= $collection . ' '; # The collection name
351$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
352$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
353$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
354$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
355$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
356&shellCommand($hadoop_command);
357print "Done!\n";
358
359# 5. If we ran *Server infodbs, we now need to shut them down
360if ($server_prefix ne '')
361{
362  print " * Deregistering as listener and shutting down... ";
363  # - deregister as a listener
364  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
365  &shellCommand($client_command1);
366  # - send quit command
367  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
368  &shellCommand($client_command2);
369  print "Done!\n";
370}
371
372# 5.5 We started them - so we better stop the thrift servers too
373# 3.5 Start up the thrift server(s) if we've been asked to
374if ($start_thrift)
375{
376  if ($is_rocks_cluster)
377  {
378    print " * Stopping Thrift Servers (on compute nodes)... ";
379    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop" >> "' . $thrift_log . '" 2>&1 ');
380  }
381  # single server
382  else
383  {
384    print " * Stoping Thrift Server... ";
385    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start >> "' . $thrift_log . '" 2>&1');
386  }
387  print "Done!\n";
388}
389
390# 6. Gather logs
391print " * Gathering logs from compute nodes... ";
392# - local files
393if (!&dirIsEmpty('/tmp/greenstone'))
394{
395  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
396}
397if (-d $gs_collection_dir . '/logs')
398{
399  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
400}
401if ($start_thrift && -d '/tmp/thrift')
402{
403  &shellCommand('cp /tmp/thrift/*.log ' . $gs_results_dir);
404}
405# - remote files
406if ($is_rocks_cluster)
407{
408  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
409  &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
410  if ($start_thrift)
411  {
412    &shellCommand('rocks run host "scp /tmp/thrift/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
413  }
414}
415print "Done!\n";
416
417# - generate data locality report...
418&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
419
420# - hadoop report...
421&shellCommand('hadoop_report.pl "' . $gs_results_dir . '"');
422
423# - and gantt chart
424&shellCommand('generate_gantt.pl -width 1600 "' . $gs_results_dir . '"');
425
426# 7. Done - clean up
427print " * Cleaning up temporary files... ";
428&shellCommand('rm -rf /tmp/greenstone');
429&shellCommand('rm -rf /tmp/gsimport*');
430if ($is_rocks_cluster)
431{
432  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
433  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
434}
435print "Done!\n";
436print "Complete!\n\n";
437
438exit;
439
440# /** @function debugPrint
441#  */
442sub debugPrint
443{
444  my $msg = shift(@_);
445  if ($debug)
446  {
447    print "[Debug] " . $msg . "\n";
448  }
449}
450# /** debugPrint() **/
451
452# /** @function hdfsCommand
453#  */
454sub hdfsCommand
455{
456  my $command = shift(@_);
457  my $paths = '"' . join('" "', @_) . '"';
458  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
459  &shellCommand($hdfs_command);
460  return $?;
461}
462# /** hdfsCommand() **/
463
464# /** @function hdfsTest
465#  */
466sub hdfsTest
467{
468  my $command = shift(@_);
469  my $test_target = shift(@_);
470  my $result = &hdfsCommand('test -' . $command, @_);
471  return ($result == $test_target);
472}
473# /** hdfsTest() **/
474
475# /**
476#  */
477sub printUsage
478{
479  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
480  exit;
481}
482# /** printUsage() **/
483
484
485## @function recursiveCopy()
486#
487sub recursiveCopy
488{
489  my ($src_dir, $hdfs_dir) = @_;
490  my $file_count = 0;
491  # - create the directory in HDFS
492  if ($use_nfs)
493  {
494    &shellCommand('mkdir "' . $hdfs_dir . '"');
495  }
496  else
497  {
498    &hdfsCommand('mkdir', $hdfs_dir);
499  }
500  # - search $src_dir for files
501  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
502  my @files = readdir(DH);
503  closedir(DH);
504  foreach my $file (@files)
505  {
506    # - skip dot prefix files
507    if ($file !~ /^\./)
508    {
509      my $src_path = $src_dir . '/' . $file;
510      # - recurse directories, remembering to extend HDFS dir too
511      if (-d $src_path)
512      {
513        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
514        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
515      }
516      # - and use 'put' to copy files
517      else
518      {
519        my $hdfs_path = $hdfs_dir . '/' . $file;
520        if ($use_nfs)
521        {
522          &shellCommand('cp "' . $src_path . '" "' . $hdfs_path . '"');
523        }
524        else
525        {
526          &hdfsCommand('put', $src_path, $hdfs_path);
527        }
528        $file_count++;
529      }
530    }
531  }
532  return $file_count;
533}
534## recursiveCopy() ##
535
536
537# /** @function shellCommand
538#  */
539sub shellCommand
540{
541  my $cmd = shift(@_);
542  my $output = '';
543  &debugPrint($cmd);
544  if (!$dry_run)
545  {
546    $output = `$cmd`;
547  }
548  return $output;
549}
550# /** shellCommand() **/
551
552# /** @function urlCat
553#  */
554sub urlCat
555{
556  my $url = join('/', @_);
557  return $url;
558}
559# /** urlCat() **/
560
561# /**
562#  */
563sub dirIsEmpty
564{
565  my $dir = shift(@_);
566  my @files;
567  if (-e $dir)
568  {
569    opendir(DIR, $dir) or die $!;
570    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
571    closedir(DIR);
572  }
573  @files ? 0 : 1;
574}
575# /** dirIsEmpty() **/
576
577
578## @function recursiveDelete()
579#
580sub recursiveDelete
581{
582  my ($dir, $prefix) = @_;
583  if ($dir =~ /^$prefix/)
584  {
585    &shellCommand('rm -rf "' . $dir . '"');
586  }
587}
588## recursiveDelete() ##
Note: See TracBrowser for help on using the browser.