root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27594

Revision 27594, 12.0 KB (checked in by jmt12, 6 years ago)

Extend hadoop_import.pl to be able to start and stop the Thrift server(s)

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11  die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 1;
22my $debug = 0;
23my $dry_run = 0;
24my $gsdl_home = $ENV{'GSDLHOME'};
25my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
26my $hadoop_exe = 'hadoop'; # you may add path
27my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
28my $hdfs_fs_prefix = 'HDThriftFS://';
29my $refresh_import = 0;
30my $username = `whoami`;
31chomp($username);
32
33`rocks > /dev/null 2>&1`;
34my $is_rocks_cluster = ($? == 0);
35
36# 1. Read and validate parameters
37if (defined $ARGV[0])
38{
39  $collection = $ARGV[0];
40}
41else
42{
43  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] \n\n";
44}
45my $offset = 1;
46while (defined $ARGV[$offset])
47{
48  if ($ARGV[$offset] eq '-debug')
49  {
50    $debug = 1;
51  }
52  if ($ARGV[$offset] eq '-disable_thrift')
53  {
54    $use_thrift = 0;
55  }
56  if ($ARGV[$offset] eq '-dry_run')
57  {
58    $dry_run = 1;
59  }
60  if ($ARGV[$offset] eq '-refresh_import')
61  {
62    $refresh_import = 1;
63  }
64  $offset++;
65}
66
67if (!$use_thrift)
68{
69  $hdfs_fs_prefix = 'HDFSShell://';
70}
71
72my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
73my $gs_import_dir = $gs_collection_dir . '/import';
74if (!-d $gs_import_dir)
75{
76  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
77}
78my $gs_results_dir = $gs_collection_dir . '/results';
79if (!-d $gs_results_dir)
80{
81  mkdir($gs_results_dir, 0755);
82}
83$gs_results_dir .= '/' . time();
84if (!-d $gs_results_dir)
85{
86  mkdir($gs_results_dir, 0755);
87}
88# - directories within HDFS
89#my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
90my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
91#my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
92my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
93
94# 2. Copy the import directory into HDFS
95print " * Replicating import directory in HDFS...";
96# - check if import directory already exists
97my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
98if ($refresh_import || !$hdfs_import_exists)
99{
100  # - clear out the old import directory
101  if ($hdfs_import_exists)
102  {
103    &hdfsCommand('rmr', $hdfs_input_dir);
104  }
105  # - now recursively copy the contents of import directory into HDFS ensuring
106  #   that relative paths are maintained
107  my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
108  &debugPrint($file_count . " files 'putted'");
109  print "Done!\n";
110}
111else
112{
113  print "Already exists!\n";
114}
115# - clear out the archives regardless
116my $gs_archives_dir = $gs_collection_dir . '/archives';
117my $deleted_archives = 0;
118if (-e $gs_archives_dir)
119{
120  print " * Clearing existing archives directory for this collection... ";
121  &shellCommand('rm -rf "' . $gs_archives_dir . '"');
122  $deleted_archives = 1;
123}
124mkdir($gs_archives_dir, 0755);
125if (&hdfsTest('d', 0, $hdfs_output_dir))
126{
127  if (!$deleted_archives)
128  {
129    print " * Clearing existing archives directory for this collection... ";
130  }
131  &hdfsCommand('rmr', $hdfs_output_dir);
132  $deleted_archives = 1;
133}
134if ($deleted_archives)
135{
136  print "Done!\n";
137}
138# - watch for cached directories for Media based collections
139my $gs_cached_dir = $gs_collection_dir . '/cached';
140if (-e $gs_cached_dir)
141{
142  print " * Clearing existing cached media directory for this collection... ";
143  &shellCommand('rm -rf "' . $gs_cached_dir . '"');
144  print "Done!\n";
145}
146
147# - clear out any old logs
148print " * Clearing existing logs for this collection... ";
149my $gs_logs_dir = $gs_collection_dir . '/logs';
150if (!&dirIsEmpty($gs_logs_dir))
151{
152  &shellCommand('rm -f ' . $gs_logs_dir . '/*.*');
153}
154if (!&dirIsEmpty('/tmp/greenstone'))
155{
156  &shellCommand('rm -f /tmp/greenstone/*.*');
157  &shellCommand('rm -rf /tmp/gsimport*');
158}
159if ($is_rocks_cluster)
160{
161  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
162  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
163}
164print "Done!\n";
165
166# - flush DNS cache too, so we are playing on a level field
167print " * Flushing disk cache... ";
168&shellCommand('flush_caches.pl');
169if ($is_rocks_cluster)
170{
171  &shellCommand('rocks run host "flush_caches.pl"');
172}
173print "Done!\n";
174
175# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
176#    where we start the server now to ensure it lives on the head node
177my $server_host = '';
178my $server_port = '';
179my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
180my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
181my $server_prefix = '';
182if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
183{
184  $server_prefix = uc($1);
185  print " * Starting " . $server_prefix . "Server... ";
186  # - start the server on the head node and retrieve the host and port from
187  #   the output
188  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
189  my $launcher_output = &shellCommand($launcher_command);
190  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
191  {
192    $server_host = $1;
193    $server_port = $2;
194    print "running on " . $server_host . ":" . $server_port . "\n";
195  }
196  else
197  {
198    print "Failed!\n";
199    exit;
200  }
201  # - use the client tool to add ourselves as a listener
202  print " * Registering as listener... ";
203  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
204  &shellCommand($client_command);
205  print "Done!\n";
206}
207else
208{
209  print "Error! True Hadoop processing is only available when Greenstone is\n";
210  print "       configured to use either GDBMServer or TDBServer.\n";
211  exit;
212}
213
214# 3.5 Start up the thrift server(s) if we've been asked to
215if ($start_thrift)
216{
217  if ($is_rocks_cluster)
218  {
219    print " * Starting Thrift Servers (on compute nodes)... ";
220    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"');
221  }
222  # single server
223  else
224  {
225    print " * Starting Thrift Server... ";
226    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
227  }
228  print "Done!\n";
229}
230
231# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
232#    and allow the FileInputFormat to split it up into files to be processed
233#    in Greenstone. This works for collections with one file per document, like
234#    Lorem and ReplayMe, but might not work well with multiple file documents
235#    such as the Demo collection
236print " * Running import using Hadoop...";
237my $hadoop_log = $gs_results_dir . '/hadoop.log';
238&shellCommand('echo "host:$HDFSHOST" > ' . $hadoop_log);
239my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' .  $hdfs_input_dir . '" "' . $hdfs_output_dir . '" >> ' . $hadoop_log . ' 2>&1';
240&shellCommand($hadoop_command);
241print "Done!\n";
242
243# 5. If we ran *Server infodbs, we now need to shut them down
244if ($server_prefix ne '')
245{
246  print " * Deregistering as listener and shutting down... ";
247  # - deregister as a listener
248  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
249  &shellCommand($client_command1);
250  # - send quit command
251  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
252  &shellCommand($client_command2);
253  print "Done!\n";
254}
255
256# 5.5 We started them - so we better stop the thrift servers too
257# 3.5 Start up the thrift server(s) if we've been asked to
258if ($start_thrift)
259{
260  if ($is_rocks_cluster)
261  {
262    print " * Stopping Thrift Servers (on compute nodes)... ";
263    &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop"');
264  }
265  # single server
266  else
267  {
268    print " * Stoping Thrift Server... ";
269    &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
270  }
271  print "Done!\n";
272}
273
274# 6. Gather logs
275print " * Gathering logs from compute nodes... ";
276# - local files
277if (!&dirIsEmpty('/tmp/greenstone'))
278{
279  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
280}
281if (-d $gs_collection_dir . '/logs')
282{
283  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
284}
285# - remote files
286if ($is_rocks_cluster)
287{
288  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
289&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
290}
291print "Done!\n";
292# - generate data locality report
293&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
294
295# 7. Done - clean up
296print " * Cleaning up temporary files... ";
297&shellCommand('rm -rf /tmp/greenstone');
298&shellCommand('rm -rf /tmp/gsimport*');
299if ($is_rocks_cluster)
300{
301  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
302  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
303}
304print "Done!\n";
305print "Complete!\n\n";
306
307exit;
308
309# /** @function debugPrint
310#  */
311sub debugPrint
312{
313  my $msg = shift(@_);
314  if ($debug)
315  {
316    print "[Debug] " . $msg . "\n";
317  }
318}
319# /** debugPrint() **/
320
321# /** @function hdfsCommand
322#  */
323sub hdfsCommand
324{
325  my $command = shift(@_);
326  my $paths = '"' . join('" "', @_) . '"';
327  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
328  &shellCommand($hdfs_command);
329  return $?;
330}
331# /** hdfsCommand() **/
332
333# /** @function hdfsTest
334#  */
335sub hdfsTest
336{
337  my $command = shift(@_);
338  my $test_target = shift(@_);
339  my $result = &hdfsCommand('test -' . $command, @_);
340  return ($result == $test_target);
341}
342# /** hdfsTest() **/
343
344# /**
345#  */
346sub printUsage
347{
348  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
349  exit;
350}
351# /** printUsage() **/
352
353# /**
354#  */
355sub recursiveCopy
356{
357  my ($src_dir, $hdfs_dir) = @_;
358  my $file_count = 0;
359  # - create the directory in HDFS
360  &hdfsCommand('mkdir', $hdfs_dir);
361  # - search $src_dir for files
362  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
363  my @files = readdir(DH);
364  closedir(DH);
365  foreach my $file (@files)
366  {
367    # - skip dot prefix files
368    if ($file !~ /^\./)
369    {
370      my $src_path = $src_dir . '/' . $file;
371      # - recurse directories, remembering to extend HDFS dir too
372      if (-d $src_path)
373      {
374        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
375        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
376      }
377      # - and use 'put' to copy files
378      else
379      {
380        my $hdfs_path = $hdfs_dir . '/' . $file;
381        &hdfsCommand('put', $src_path, $hdfs_path);
382        $file_count++;
383      }
384    }
385  }
386  return $file_count;
387}
388# /** recursiveCopy() **/
389
390# /** @function shellCommand
391#  */
392sub shellCommand
393{
394  my $cmd = shift(@_);
395  my $output = '';
396  &debugPrint($cmd);
397  if (!$dry_run)
398  {
399    $output = `$cmd`;
400  }
401  return $output;
402}
403# /** shellCommand() **/
404
405# /** @function urlCat
406#  */
407sub urlCat
408{
409  my $url = join('/', @_);
410  return $url;
411}
412# /** urlCat() **/
413
414# /**
415#  */
416sub dirIsEmpty
417{
418  my $dir = shift(@_);
419  my @files;
420  if (-e $dir)
421  {
422    opendir(DIR, $dir) or die $!;
423    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
424    closedir(DIR);
425  }
426  @files ? 0 : 1;
427}
428# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the browser.