root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27001

Revision 27001, 8.7 KB (checked in by jmt12, 7 years ago)

Passing more environment variables (HADOOPPREFIX, HDFSHOST, HDFSPORT) through to hadoop (and thus on to compute nodes). Debug comments. Directory not empty test before attempting to copy log files preventing non-fatal errors. Whether on cluster determined by presence of 'rocks' executable rather than hardcoded hostname (one less thing for me to forget to change between computers)

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $debug = 0;
19my $dry_run = 0;
20my $gsdl_home = $ENV{'GSDLHOME'};
21my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
22my $hadoop_exe = 'hadoop'; # you may add path
23my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
24my $hdfs_fs_prefix = 'hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'};
25my $refresh_import = 0;
26my $username = `whoami`;
27chomp($username);
28
29`rocks > /dev/null 2>&1`;
30my $is_rocks_cluster = ($? == 0);
31
32# 1. Read and validate parameters
33if (defined $ARGV[0])
34{
35  $collection = $ARGV[0];
36}
37my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
38my $gs_import_dir = $gs_collection_dir . '/import';
39if (!-d $gs_import_dir)
40{
41  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
42}
43my $gs_results_dir = $gs_collection_dir . '/results';
44if (!-d $gs_results_dir)
45{
46  mkdir($gs_results_dir, 0755);
47}
48$gs_results_dir .= '/' . time();
49if (!-d $gs_results_dir)
50{
51  mkdir($gs_results_dir, 0755);
52}
53my $gs_archives_dir = $gs_collection_dir . '/archives';
54# - directories within HDFS
55my $hdfs_input_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'import');
56my $hdfs_output_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
57
58# 2. Copy the import directory into HDFS
59print " * Replicating import directory in HDFS...";
60# - check if import directory already exists
61my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
62if ($refresh_import || !$hdfs_import_exists)
63{
64  # - clear out the old import directory
65  if ($hdfs_import_exists)
66  {
67    &hdfsCommand('rmr', $hdfs_input_dir);
68  }
69  # - now recursively copy the contents of import directory into HDFS ensuring
70  #   that relative paths are maintained
71  my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
72  &debugPrint($file_count . " files 'putted'");
73  print "Done!\n";
74}
75else
76{
77  print "Already exists!\n";
78}
79# - clear out the archives regardless
80if (-e $gs_archives_dir)
81{
82  &shellCommand('rm -rf "' . $gs_archives_dir . '"');
83}
84mkdir($gs_archives_dir, 0755);
85if (&hdfsTest('d', 0, $hdfs_output_dir))
86{
87  print " * Clearing existing archives directory for this collection... ";
88  &hdfsCommand('rmr', $hdfs_output_dir);
89  print "Done!\n";
90}
91# - clear out any old logs
92if (!&dirIsEmpty('/tmp/greenstone'))
93{
94  &shellCommand('rm /tmp/greenstone/*.*');
95}
96if ($is_rocks_cluster)
97{
98  &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
99}
100
101# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
102#    where we start the server now to ensure it lives on the head node
103my $server_host = '';
104my $server_port = '';
105my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
106my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
107my $server_prefix = '';
108if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
109{
110  $server_prefix = uc($1);
111  print " * Starting " . $server_prefix . "Server... ";
112  # - start the server on the head node and retrieve the host and port from
113  #   the output
114  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
115  my $launcher_output = &shellCommand($launcher_command);
116  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
117  {
118    $server_host = $1;
119    $server_port = $2;
120    print "running on " . $server_host . ":" . $server_port . "\n";
121  }
122  else
123  {
124    print "Failed!\n";
125    exit;
126  }
127  # - use the client tool to add ourselves as a listener
128  print " * Registering as listener... ";
129  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
130  &shellCommand($client_command);
131  print "Done!\n";
132}
133else
134{
135  print "Error! True Hadoop processing is only available when Greenstone is\n";
136  print "       configured to use either GDBMServer or TDBServer.\n";
137  exit;
138}
139
140# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
141#    and allow the FileInputFormat to split it up into files to be processed
142#    in Greenstone. This works for collections with one file per document, like
143#    Lorem and ReplayMe, but might not work well with multiple file documents
144#    such as the Demo collection
145print " * Running import using Hadoop...";
146my $hadoop_log = $gs_results_dir . '/hadoop.log';
147my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $ENV{'HDFSHOST'} . '" ' . $ENV{'HDFSPORT'} . ' "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . " " .  $hdfs_input_dir . " " . $hdfs_output_dir . " > " . $hadoop_log . " 2>&1";
148&shellCommand($hadoop_command);
149print "Done!\n";
150
151# 5. If we ran *Server infodbs, we now need to shut them down
152if ($server_prefix ne '')
153{
154  print " * Deregistering as listener and shutting down... ";
155  # - deregister as a listener
156  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
157  &shellCommand($client_command1);
158  # - send quit command
159  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
160  &shellCommand($client_command2);
161  print "Done!\n";
162}
163
164
165# 6. Gather logs
166print " * Gathering logs from compute nodes... ";
167# - local files
168if (!&dirIsEmpty('/tmp/greenstone'))
169{
170  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
171}
172if (-d $gs_collection_dir . '/logs')
173{
174  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
175}
176# - remote files
177if ($is_rocks_cluster)
178{
179  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
180&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
181}
182print "Done!\n";
183
184# 7. Done - clean up
185print " * Cleaning up temporary files... ";
186&shellCommand('rm -rf /tmp/greenstone');
187if ($is_rocks_cluster)
188{
189  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
190}
191print "Done!\n";
192print "Complete!\n\n";
193
194exit;
195
196# /** @function debugPrint
197#  */
198sub debugPrint
199{
200  my $msg = shift(@_);
201  if ($debug)
202  {
203    print "[Debug] " . $msg . "\n";
204  }
205}
206# /** debugPrint() **/
207
208# /** @function hdfsCommand
209#  */
210sub hdfsCommand
211{
212  my $command = shift(@_);
213  my $paths = '"' . join('" "', @_) . '"';
214  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
215  &shellCommand($hdfs_command);
216  return $?;
217}
218# /** hdfsCommand() **/
219
220# /** @function hdfsTest
221#  */
222sub hdfsTest
223{
224  my $command = shift(@_);
225  my $test_target = shift(@_);
226  my $result = &hdfsCommand('test -' . $command, @_);
227  return ($result == $test_target);
228}
229# /** hdfsTest() **/
230
231# /**
232#  */
233sub printUsage
234{
235  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
236  exit;
237}
238# /** printUsage() **/
239
240# /**
241#  */
242sub recursiveCopy
243{
244  my ($src_dir, $hdfs_dir) = @_;
245  my $file_count = 0;
246  # - create the directory in HDFS
247  &hdfsCommand('mkdir', $hdfs_dir);
248  # - search $src_dir for files
249  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
250  my @files = readdir(DH);
251  closedir(DH);
252  foreach my $file (@files)
253  {
254    # - skip dot prefix files
255    if ($file !~ /^\./)
256    {
257      my $src_path = $src_dir . '/' . $file;
258      # - recurse directories, remembering to extend HDFS dir too
259      if (-d $src_path)
260      {
261        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
262        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
263      }
264      # - and use 'put' to copy files
265      else
266      {
267        my $hdfs_path = $hdfs_dir . '/' . $file;
268        &hdfsCommand('put', $src_path, $hdfs_path);
269        $file_count++;
270      }
271    }
272  }
273  return $file_count;
274}
275# /** recursiveCopy() **/
276
277# /** @function shellCommand
278#  */
279sub shellCommand
280{
281  my $cmd = shift(@_);
282  my $output = '';
283  &debugPrint($cmd);
284  if (!$dry_run)
285  {
286    $output = `$cmd`;
287  }
288  return $output;
289}
290# /** shellCommand() **/
291
292# /** @function urlCat
293#  */
294sub urlCat
295{
296  my $url = join('/', @_);
297  return $url;
298}
299# /** urlCat() **/
300
301# /**
302#  */
303sub dirIsEmpty
304{
305  my $dir = shift(@_);
306  my @files;
307  if (-e $dir)
308  {
309    opendir(DIR, $dir) or die $!;
310    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
311    closedir(DIR);
312  }
313  @files ? 0 : 1;
314}
315# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the browser.