root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27644

Revision 27644, 14.4 KB (checked in by jmt12, 6 years ago)

Extended to support HDFS-access via NFS. This applies to both the call to Hadoop (which needed one extra argument stating the NFS located archives dir) but also to the import directory setup stuff (much quicker to write to HDFS via NFS rather than Java application calls)

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $flush_diskcache = 0;
25my $use_nfs = 0;
26
27my $gsdl_home = $ENV{'GSDLHOME'};
28my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
29my $hadoop_exe = 'hadoop'; # you may add path
30my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
31my $hdfs_fs_prefix = 'HDThriftFS://';
32my $refresh_import = 0;
33my $username = `whoami`;
34chomp($username);
35
36`rocks > /dev/null 2>&1`;
37my $is_rocks_cluster = ($? == 0);
38
39# 1. Read and validate parameters
40if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
41{
42  $collection = $ARGV[0];
43}
44else
45{
46  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs]\n\n";
47  exit;
48}
49my $offset = 1;
50while (defined $ARGV[$offset])
51{
52  if ($ARGV[$offset] eq '-debug')
53  {
54    $debug = 1;
55  }
56  if ($ARGV[$offset] eq '-disable_thrift')
57  {
58    $use_thrift = 0;
59  }
60  if ($ARGV[$offset] eq '-dry_run')
61  {
62    $dry_run = 1;
63  }
64  if ($ARGV[$offset] eq '-refresh_import')
65  {
66    $refresh_import = 1;
67  }
68  if ($ARGV[$offset] eq '-flush_diskcache')
69  {
70    $flush_diskcache = 1;
71  }
72  if ($ARGV[$offset] eq '-start_thrift')
73  {
74    $start_thrift = 1;
75  }
76  if ($ARGV[$offset] eq '-use_nfs')
77  {
78    $use_nfs = 1;
79  }
80  $offset++;
81}
82
83if (!$use_thrift)
84{
85  $hdfs_fs_prefix = 'HDFSShell://';
86}
87if ($use_nfs)
88{
89  $hdfs_fs_prefix = '/hdfs';
90}
91
92my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
93my $gs_import_dir = $gs_collection_dir . '/import';
94if (!-d $gs_import_dir)
95{
96  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
97}
98my $gs_results_dir = $gs_collection_dir . '/results';
99if (!-d $gs_results_dir)
100{
101  mkdir($gs_results_dir, 0755);
102}
103$gs_results_dir .= '/' . time();
104if (!-d $gs_results_dir)
105{
106  mkdir($gs_results_dir, 0755);
107}
108# - directories within HDFS
109my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
110print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
111my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
112if ($use_nfs)
113{
114  print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
115}
116my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
117print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
118my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
119if ($use_nfs)
120{
121  print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
122}
123
124# 2. Copy the import directory into HDFS
125print " * Replicating import directory in HDFS...";
126# - check if import directory already exists
127my $hdfs_import_exists = 0;
128if ($use_nfs)
129{
130  if (-d $nfs_input_dir)
131  {
132    $hdfs_import_exists = 1;
133  }
134}
135else
136{
137  $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
138}
139if ($refresh_import || !$hdfs_import_exists)
140{
141  # - clear out the old import directory
142  if ($hdfs_import_exists)
143  {
144    if ($use_nfs)
145    {
146      &recursiveDelete($nfs_input_dir, '/hdfs');
147    }
148    else
149    {
150      &hdfsCommand('rmr', $hdfs_input_dir);
151    }
152  }
153  # - now recursively copy the contents of import directory into HDFS ensuring
154  #   that relative paths are maintained
155  my $file_count = 0;
156  if ($use_nfs)
157  {
158    $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
159  }
160  else
161  {
162    $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
163  }
164  &debugPrint($file_count . " files 'putted'");
165  print "Done!\n";
166}
167else
168{
169  print "Already exists!\n";
170}
171
172# - clear out the archives regardless
173my $gs_archives_dir = $gs_collection_dir . '/archives';
174my $deleted_archives = 0;
175if (-e $gs_archives_dir)
176{
177  print " * Clearing existing archives directory for this collection... ";
178  &recursiveDelete($gs_archives_dir, $gsdl_home);
179  $deleted_archives = 1;
180}
181mkdir($gs_archives_dir, 0755);
182my $hdfs_archives_exists = 0;
183if ($use_nfs)
184{
185  if (-d $nfs_output_dir)
186  {
187    $hdfs_archives_exists = 1;
188  }
189}
190else
191{
192  $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
193}
194if ($hdfs_archives_exists)
195{
196  if (!$deleted_archives)
197  {
198    print " * Clearing existing archives directory for this collection... ";
199  }
200  if ($use_nfs)
201  {
202    &recursiveDelete($nfs_output_dir, '/hdfs');
203  }
204  else
205  {
206    &hdfsCommand('rmr', $hdfs_output_dir);
207  }
208  $deleted_archives = 1;
209}
210if ($deleted_archives)
211{
212  print "Done!\n";
213}
214
215# - watch for cached directories for Media based collections
216my $gs_cached_dir = $gs_collection_dir . '/cached';
217if (-e $gs_cached_dir)
218{
219  print " * Clearing existing cached media directory for this collection... ";
220  &recursiveDelete($gs_cached_dir, $gsdl_home);
221  print "Done!\n";
222}
223
224# - clear out any old logs
225print " * Clearing existing logs for this collection... ";
226my $gs_logs_dir = $gs_collection_dir . '/logs';
227if (!&dirIsEmpty($gs_logs_dir))
228{
229  &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
230}
231if (!&dirIsEmpty('/tmp/greenstone'))
232{
233  &shellCommand('rm -f /tmp/greenstone/*.*');
234  &shellCommand('rm -rf /tmp/gsimport*');
235  &shellCommand('rm -rf /tmp/thrift');
236}
237if ($is_rocks_cluster)
238{
239  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
240  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
241  &shellCommand('rocks run host "rm -rf /tmp/thrift"');
242}
243print "Done!\n";
244
245# - flush DNS cache too, so we are playing on a level field
246if ($flush_diskcache)
247{
248  print " * Flushing disk cache... ";
249  &shellCommand('flush_caches.pl');
250  if ($is_rocks_cluster)
251  {
252    &shellCommand('rocks run host "flush_caches.pl"');
253  }
254  print "Done!\n";
255}
256
257# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
258#    where we start the server now to ensure it lives on the head node
259my $server_host = '';
260my $server_port = '';
261my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
262my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
263my $server_prefix = '';
264if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
265{
266  $server_prefix = uc($1);
267  print " * Starting " . $server_prefix . "Server... ";
268  # - start the server on the head node and retrieve the host and port from
269  #   the output
270  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
271  my $launcher_output = &shellCommand($launcher_command);
272  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
273  {
274    $server_host = $1;
275    $server_port = $2;
276    print "running on " . $server_host . ":" . $server_port . "\n";
277  }
278  else
279  {
280    print "Failed!\n";
281    exit;
282  }
283  # - use the client tool to add ourselves as a listener
284  print " * Registering as listener... ";
285  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
286  &shellCommand($client_command);
287  print "Done!\n";
288}
289else
290{
291  print "Error! True Hadoop processing is only available when Greenstone is\n";
292  print "       configured to use either GDBMServer or TDBServer.\n";
293  exit;
294}
295
296# 3.5 Start up the thrift server(s) if we've been asked to
297if ($start_thrift)
298{
299  if ($is_rocks_cluster)
300  {
301    print " * Starting Thrift Servers (on compute nodes)... ";
302    print "[DEBUG]\n" . &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"') . "\n\n";
303  }
304  # single server
305  else
306  {
307    print " * Starting Thrift Server... ";
308    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
309  }
310  print "Done!\n";
311}
312
313my $actual_archives_dir;
314if ($use_nfs)
315{
316  $actual_archives_dir = $nfs_output_dir;
317}
318else
319{
320  $actual_archives_dir = $hdfs_output_dir;
321  $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
322}
323
324# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
325#    and allow the FileInputFormat to split it up into files to be processed
326#    in Greenstone. This works for collections with one file per document, like
327#    Lorem and ReplayMe, but might not work well with multiple file documents
328#    such as the Demo collection
329print " * Running import using Hadoop...";
330my $hadoop_log = $gs_results_dir . '/hadoop.log';
331&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
332my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest ';
333$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
334$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
335$hadoop_command .= $collection . ' '; # The collection name
336$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
337$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
338$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
339$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
340$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
341&shellCommand($hadoop_command);
342print "Done!\n";
343
344# 5. If we ran *Server infodbs, we now need to shut them down
345if ($server_prefix ne '')
346{
347  print " * Deregistering as listener and shutting down... ";
348  # - deregister as a listener
349  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
350  &shellCommand($client_command1);
351  # - send quit command
352  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
353  &shellCommand($client_command2);
354  print "Done!\n";
355}
356
357# 5.5 We started them - so we better stop the thrift servers too
358# 3.5 Start up the thrift server(s) if we've been asked to
359if ($start_thrift)
360{
361  if ($is_rocks_cluster)
362  {
363    print " * Stopping Thrift Servers (on compute nodes)... ";
364    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop"');
365  }
366  # single server
367  else
368  {
369    print " * Stoping Thrift Server... ";
370    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
371  }
372  print "Done!\n";
373}
374
375# 6. Gather logs
376print " * Gathering logs from compute nodes... ";
377# - local files
378if (!&dirIsEmpty('/tmp/greenstone'))
379{
380  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
381}
382if (-d $gs_collection_dir . '/logs')
383{
384  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
385}
386# - remote files
387if ($is_rocks_cluster)
388{
389  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
390&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
391}
392print "Done!\n";
393# - generate data locality report
394&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
395
396# 7. Done - clean up
397print " * Cleaning up temporary files... ";
398&shellCommand('rm -rf /tmp/greenstone');
399&shellCommand('rm -rf /tmp/gsimport*');
400if ($is_rocks_cluster)
401{
402  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
403  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
404}
405print "Done!\n";
406print "Complete!\n\n";
407
408exit;
409
410# /** @function debugPrint
411#  */
412sub debugPrint
413{
414  my $msg = shift(@_);
415  if ($debug)
416  {
417    print "[Debug] " . $msg . "\n";
418  }
419}
420# /** debugPrint() **/
421
422# /** @function hdfsCommand
423#  */
424sub hdfsCommand
425{
426  my $command = shift(@_);
427  my $paths = '"' . join('" "', @_) . '"';
428  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
429  &shellCommand($hdfs_command);
430  return $?;
431}
432# /** hdfsCommand() **/
433
434# /** @function hdfsTest
435#  */
436sub hdfsTest
437{
438  my $command = shift(@_);
439  my $test_target = shift(@_);
440  my $result = &hdfsCommand('test -' . $command, @_);
441  return ($result == $test_target);
442}
443# /** hdfsTest() **/
444
445# /**
446#  */
447sub printUsage
448{
449  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
450  exit;
451}
452# /** printUsage() **/
453
454
455## @function recursiveCopy()
456#
457sub recursiveCopy
458{
459  my ($src_dir, $hdfs_dir) = @_;
460  my $file_count = 0;
461  # - create the directory in HDFS
462  if ($use_nfs)
463  {
464    &shellCommand('mkdir "' . $hdfs_dir . '"');
465  }
466  else
467  {
468    &hdfsCommand('mkdir', $hdfs_dir);
469  }
470  # - search $src_dir for files
471  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
472  my @files = readdir(DH);
473  closedir(DH);
474  foreach my $file (@files)
475  {
476    # - skip dot prefix files
477    if ($file !~ /^\./)
478    {
479      my $src_path = $src_dir . '/' . $file;
480      # - recurse directories, remembering to extend HDFS dir too
481      if (-d $src_path)
482      {
483        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
484        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
485      }
486      # - and use 'put' to copy files
487      else
488      {
489        my $hdfs_path = $hdfs_dir . '/' . $file;
490        if ($use_nfs)
491        {
492          &shellCommand('cp "' . $src_path . '" "' . $hdfs_path . '"');
493        }
494        else
495        {
496          &hdfsCommand('put', $src_path, $hdfs_path);
497        }
498        $file_count++;
499      }
500    }
501  }
502  return $file_count;
503}
504## recursiveCopy() ##
505
506
507# /** @function shellCommand
508#  */
509sub shellCommand
510{
511  my $cmd = shift(@_);
512  my $output = '';
513  &debugPrint($cmd);
514  if (!$dry_run)
515  {
516    $output = `$cmd`;
517  }
518  return $output;
519}
520# /** shellCommand() **/
521
522# /** @function urlCat
523#  */
524sub urlCat
525{
526  my $url = join('/', @_);
527  return $url;
528}
529# /** urlCat() **/
530
531# /**
532#  */
533sub dirIsEmpty
534{
535  my $dir = shift(@_);
536  my @files;
537  if (-e $dir)
538  {
539    opendir(DIR, $dir) or die $!;
540    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
541    closedir(DIR);
542  }
543  @files ? 0 : 1;
544}
545# /** dirIsEmpty() **/
546
547
548## @function recursiveDelete()
549#
550sub recursiveDelete
551{
552  my ($dir, $prefix) = @_;
553  if ($dir =~ /^$prefix/)
554  {
555    &shellCommand('rm -rf "' . $dir . '"');
556  }
557}
558## recursiveDelete() ##
Note: See TracBrowser for help on using the browser.