root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27058

Revision 27058, 8.8 KB (checked in by jmt12, 7 years ago)

Adding data locality report generation to Hadoop greenstone imports

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $debug = 0;
19my $dry_run = 0;
20my $gsdl_home = $ENV{'GSDLHOME'};
21my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
22my $hadoop_exe = 'hadoop'; # you may add path
23my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
24my $hdfs_fs_prefix = 'hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'};
25my $refresh_import = 0;
26my $username = `whoami`;
27chomp($username);
28
29`rocks > /dev/null 2>&1`;
30my $is_rocks_cluster = ($? == 0);
31
32# 1. Read and validate parameters
33if (defined $ARGV[0])
34{
35  $collection = $ARGV[0];
36}
37my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
38my $gs_import_dir = $gs_collection_dir . '/import';
39if (!-d $gs_import_dir)
40{
41  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
42}
43my $gs_results_dir = $gs_collection_dir . '/results';
44if (!-d $gs_results_dir)
45{
46  mkdir($gs_results_dir, 0755);
47}
48$gs_results_dir .= '/' . time();
49if (!-d $gs_results_dir)
50{
51  mkdir($gs_results_dir, 0755);
52}
53my $gs_archives_dir = $gs_collection_dir . '/archives';
54# - directories within HDFS
55my $hdfs_input_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'import');
56my $hdfs_output_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
57
58# 2. Copy the import directory into HDFS
59print " * Replicating import directory in HDFS...";
60# - check if import directory already exists
61my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
62if ($refresh_import || !$hdfs_import_exists)
63{
64  # - clear out the old import directory
65  if ($hdfs_import_exists)
66  {
67    &hdfsCommand('rmr', $hdfs_input_dir);
68  }
69  # - now recursively copy the contents of import directory into HDFS ensuring
70  #   that relative paths are maintained
71  my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
72  &debugPrint($file_count . " files 'putted'");
73  print "Done!\n";
74}
75else
76{
77  print "Already exists!\n";
78}
79# - clear out the archives regardless
80if (-e $gs_archives_dir)
81{
82  &shellCommand('rm -rf "' . $gs_archives_dir . '"');
83}
84mkdir($gs_archives_dir, 0755);
85if (&hdfsTest('d', 0, $hdfs_output_dir))
86{
87  print " * Clearing existing archives directory for this collection... ";
88  &hdfsCommand('rmr', $hdfs_output_dir);
89  print "Done!\n";
90}
91# - clear out any old logs
92if (!&dirIsEmpty('/tmp/greenstone'))
93{
94  &shellCommand('rm /tmp/greenstone/*.*');
95}
96if ($is_rocks_cluster)
97{
98  &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
99}
100
101# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
102#    where we start the server now to ensure it lives on the head node
103my $server_host = '';
104my $server_port = '';
105my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
106my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
107my $server_prefix = '';
108if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
109{
110  $server_prefix = uc($1);
111  print " * Starting " . $server_prefix . "Server... ";
112  # - start the server on the head node and retrieve the host and port from
113  #   the output
114  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
115  my $launcher_output = &shellCommand($launcher_command);
116  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
117  {
118    $server_host = $1;
119    $server_port = $2;
120    print "running on " . $server_host . ":" . $server_port . "\n";
121  }
122  else
123  {
124    print "Failed!\n";
125    exit;
126  }
127  # - use the client tool to add ourselves as a listener
128  print " * Registering as listener... ";
129  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
130  &shellCommand($client_command);
131  print "Done!\n";
132}
133else
134{
135  print "Error! True Hadoop processing is only available when Greenstone is\n";
136  print "       configured to use either GDBMServer or TDBServer.\n";
137  exit;
138}
139
140# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
141#    and allow the FileInputFormat to split it up into files to be processed
142#    in Greenstone. This works for collections with one file per document, like
143#    Lorem and ReplayMe, but might not work well with multiple file documents
144#    such as the Demo collection
145print " * Running import using Hadoop...";
146my $hadoop_log = $gs_results_dir . '/hadoop.log';
147my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $ENV{'HDFSHOST'} . '" ' . $ENV{'HDFSPORT'} . ' "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . " " .  $hdfs_input_dir . " " . $hdfs_output_dir . " > " . $hadoop_log . " 2>&1";
148&shellCommand($hadoop_command);
149print "Done!\n";
150
151# 5. If we ran *Server infodbs, we now need to shut them down
152if ($server_prefix ne '')
153{
154  print " * Deregistering as listener and shutting down... ";
155  # - deregister as a listener
156  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
157  &shellCommand($client_command1);
158  # - send quit command
159  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
160  &shellCommand($client_command2);
161  print "Done!\n";
162}
163
164
165# 6. Gather logs
166print " * Gathering logs from compute nodes... ";
167# - local files
168if (!&dirIsEmpty('/tmp/greenstone'))
169{
170  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
171}
172if (-d $gs_collection_dir . '/logs')
173{
174  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
175}
176# - remote files
177if ($is_rocks_cluster)
178{
179  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
180&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
181}
182print "Done!\n";
183# - generate data locality report
184&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
185
186# 7. Done - clean up
187print " * Cleaning up temporary files... ";
188&shellCommand('rm -rf /tmp/greenstone');
189if ($is_rocks_cluster)
190{
191  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
192}
193print "Done!\n";
194print "Complete!\n\n";
195
196exit;
197
198# /** @function debugPrint
199#  */
200sub debugPrint
201{
202  my $msg = shift(@_);
203  if ($debug)
204  {
205    print "[Debug] " . $msg . "\n";
206  }
207}
208# /** debugPrint() **/
209
210# /** @function hdfsCommand
211#  */
212sub hdfsCommand
213{
214  my $command = shift(@_);
215  my $paths = '"' . join('" "', @_) . '"';
216  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
217  &shellCommand($hdfs_command);
218  return $?;
219}
220# /** hdfsCommand() **/
221
222# /** @function hdfsTest
223#  */
224sub hdfsTest
225{
226  my $command = shift(@_);
227  my $test_target = shift(@_);
228  my $result = &hdfsCommand('test -' . $command, @_);
229  return ($result == $test_target);
230}
231# /** hdfsTest() **/
232
233# /**
234#  */
235sub printUsage
236{
237  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
238  exit;
239}
240# /** printUsage() **/
241
242# /**
243#  */
244sub recursiveCopy
245{
246  my ($src_dir, $hdfs_dir) = @_;
247  my $file_count = 0;
248  # - create the directory in HDFS
249  &hdfsCommand('mkdir', $hdfs_dir);
250  # - search $src_dir for files
251  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
252  my @files = readdir(DH);
253  closedir(DH);
254  foreach my $file (@files)
255  {
256    # - skip dot prefix files
257    if ($file !~ /^\./)
258    {
259      my $src_path = $src_dir . '/' . $file;
260      # - recurse directories, remembering to extend HDFS dir too
261      if (-d $src_path)
262      {
263        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
264        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
265      }
266      # - and use 'put' to copy files
267      else
268      {
269        my $hdfs_path = $hdfs_dir . '/' . $file;
270        &hdfsCommand('put', $src_path, $hdfs_path);
271        $file_count++;
272      }
273    }
274  }
275  return $file_count;
276}
277# /** recursiveCopy() **/
278
279# /** @function shellCommand
280#  */
281sub shellCommand
282{
283  my $cmd = shift(@_);
284  my $output = '';
285  &debugPrint($cmd);
286  if (!$dry_run)
287  {
288    $output = `$cmd`;
289  }
290  return $output;
291}
292# /** shellCommand() **/
293
294# /** @function urlCat
295#  */
296sub urlCat
297{
298  my $url = join('/', @_);
299  return $url;
300}
301# /** urlCat() **/
302
303# /**
304#  */
305sub dirIsEmpty
306{
307  my $dir = shift(@_);
308  my @files;
309  if (-e $dir)
310  {
311    opendir(DIR, $dir) or die $!;
312    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
313    closedir(DIR);
314  }
315  @files ? 0 : 1;
316}
317# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the browser.