root/gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl @ 27584

Revision 27584, 10.7 KB (checked in by jmt12, 6 years ago)

I wasn't doing -r when attempting to clear directories left in /tmp by video processing

  • Property svn:executable set to *
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8  die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9  die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10  die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11  die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $use_thrift = 1;
19my $debug = 0;
20my $dry_run = 0;
21my $gsdl_home = $ENV{'GSDLHOME'};
22my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
23my $hadoop_exe = 'hadoop'; # you may add path
24my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
25my $hdfs_fs_prefix = 'HDThriftFS://';
26my $refresh_import = 0;
27my $username = `whoami`;
28chomp($username);
29
30`rocks > /dev/null 2>&1`;
31my $is_rocks_cluster = ($? == 0);
32
33# 1. Read and validate parameters
34if (defined $ARGV[0])
35{
36  $collection = $ARGV[0];
37}
38else
39{
40  print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-disable_thrift] [-refresh_import] \n\n";
41}
42my $offset = 1;
43while (defined $ARGV[$offset])
44{
45  if ($ARGV[$offset] eq '-debug')
46  {
47    $debug = 1;
48  }
49  if ($ARGV[$offset] eq '-disable_thrift')
50  {
51    $use_thrift = 0;
52  }
53  if ($ARGV[$offset] eq '-dry_run')
54  {
55    $dry_run = 1;
56  }
57  if ($ARGV[$offset] eq '-refresh_import')
58  {
59    $refresh_import = 1;
60  }
61  $offset++;
62}
63
64if (!$use_thrift)
65{
66  $hdfs_fs_prefix = 'HDFSShell://';
67}
68
69my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
70my $gs_import_dir = $gs_collection_dir . '/import';
71if (!-d $gs_import_dir)
72{
73  die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
74}
75my $gs_results_dir = $gs_collection_dir . '/results';
76if (!-d $gs_results_dir)
77{
78  mkdir($gs_results_dir, 0755);
79}
80$gs_results_dir .= '/' . time();
81if (!-d $gs_results_dir)
82{
83  mkdir($gs_results_dir, 0755);
84}
85# - directories within HDFS
86#my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
87my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
88#my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
89my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
90
91# 2. Copy the import directory into HDFS
92print " * Replicating import directory in HDFS...";
93# - check if import directory already exists
94my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
95if ($refresh_import || !$hdfs_import_exists)
96{
97  # - clear out the old import directory
98  if ($hdfs_import_exists)
99  {
100    &hdfsCommand('rmr', $hdfs_input_dir);
101  }
102  # - now recursively copy the contents of import directory into HDFS ensuring
103  #   that relative paths are maintained
104  my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
105  &debugPrint($file_count . " files 'putted'");
106  print "Done!\n";
107}
108else
109{
110  print "Already exists!\n";
111}
112# - clear out the archives regardless
113my $gs_archives_dir = $gs_collection_dir . '/archives';
114my $deleted_archives = 0;
115if (-e $gs_archives_dir)
116{
117  print " * Clearing existing archives directory for this collection... ";
118  &shellCommand('rm -rf "' . $gs_archives_dir . '"');
119  $deleted_archives = 1;
120}
121mkdir($gs_archives_dir, 0755);
122if (&hdfsTest('d', 0, $hdfs_output_dir))
123{
124  if (!$deleted_archives)
125  {
126    print " * Clearing existing archives directory for this collection... ";
127  }
128  &hdfsCommand('rmr', $hdfs_output_dir);
129  $deleted_archives = 1;
130}
131if ($deleted_archives)
132{
133  print "Done!\n";
134}
135# - watch for cached directories for Media based collections
136my $gs_cached_dir = $gs_collection_dir . '/cached';
137if (-e $gs_cached_dir)
138{
139  print " * Clearing existing cached media directory for this collection... ";
140  &shellCommand('rm -rf "' . $gs_cached_dir . '"');
141  print "Done!\n";
142}
143
144# - clear out any old logs
145print " * Clearing existing logs for this collection... ";
146my $gs_logs_dir = $gs_collection_dir . '/logs';
147if (!&dirIsEmpty($gs_logs_dir))
148{
149  &shellCommand('rm -f ' . $gs_logs_dir . '/*.*');
150}
151if (!&dirIsEmpty('/tmp/greenstone'))
152{
153  &shellCommand('rm -f /tmp/greenstone/*.*');
154  &shellCommand('rm -rf /tmp/gsimport*');
155}
156if ($is_rocks_cluster)
157{
158  &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
159  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
160}
161print "Done!\n";
162
163# - flush DNS cache too, so we are playing on a level field
164print " * Flushing disk cache... ";
165&shellCommand('flush_caches.pl');
166if ($is_rocks_cluster)
167{
168  &shellCommand('rocks run host "flush_caches.pl"');
169}
170print "Done!\n";
171
172# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
173#    where we start the server now to ensure it lives on the head node
174my $server_host = '';
175my $server_port = '';
176my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
177my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
178my $server_prefix = '';
179if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
180{
181  $server_prefix = uc($1);
182  print " * Starting " . $server_prefix . "Server... ";
183  # - start the server on the head node and retrieve the host and port from
184  #   the output
185  my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
186  my $launcher_output = &shellCommand($launcher_command);
187  if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
188  {
189    $server_host = $1;
190    $server_port = $2;
191    print "running on " . $server_host . ":" . $server_port . "\n";
192  }
193  else
194  {
195    print "Failed!\n";
196    exit;
197  }
198  # - use the client tool to add ourselves as a listener
199  print " * Registering as listener... ";
200  my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
201  &shellCommand($client_command);
202  print "Done!\n";
203}
204else
205{
206  print "Error! True Hadoop processing is only available when Greenstone is\n";
207  print "       configured to use either GDBMServer or TDBServer.\n";
208  exit;
209}
210
211# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
212#    and allow the FileInputFormat to split it up into files to be processed
213#    in Greenstone. This works for collections with one file per document, like
214#    Lorem and ReplayMe, but might not work well with multiple file documents
215#    such as the Demo collection
216print " * Running import using Hadoop...";
217my $hadoop_log = $gs_results_dir . '/hadoop.log';
218&shellCommand('echo "host:$HDFSHOST" > ' . $hadoop_log);
219my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' .  $hdfs_input_dir . '" "' . $hdfs_output_dir . '" >> ' . $hadoop_log . ' 2>&1';
220&shellCommand($hadoop_command);
221print "Done!\n";
222
223# 5. If we ran *Server infodbs, we now need to shut them down
224if ($server_prefix ne '')
225{
226  print " * Deregistering as listener and shutting down... ";
227  # - deregister as a listener
228  my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
229  &shellCommand($client_command1);
230  # - send quit command
231  my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
232  &shellCommand($client_command2);
233  print "Done!\n";
234}
235
236
237# 6. Gather logs
238print " * Gathering logs from compute nodes... ";
239# - local files
240if (!&dirIsEmpty('/tmp/greenstone'))
241{
242  &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
243}
244if (-d $gs_collection_dir . '/logs')
245{
246  &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
247}
248# - remote files
249if ($is_rocks_cluster)
250{
251  &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
252&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
253}
254print "Done!\n";
255# - generate data locality report
256&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
257
258# 7. Done - clean up
259print " * Cleaning up temporary files... ";
260&shellCommand('rm -rf /tmp/greenstone');
261&shellCommand('rm -rf /tmp/gsimport*');
262if ($is_rocks_cluster)
263{
264  &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
265  &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
266}
267print "Done!\n";
268print "Complete!\n\n";
269
270exit;
271
272# /** @function debugPrint
273#  */
274sub debugPrint
275{
276  my $msg = shift(@_);
277  if ($debug)
278  {
279    print "[Debug] " . $msg . "\n";
280  }
281}
282# /** debugPrint() **/
283
284# /** @function hdfsCommand
285#  */
286sub hdfsCommand
287{
288  my $command = shift(@_);
289  my $paths = '"' . join('" "', @_) . '"';
290  my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
291  &shellCommand($hdfs_command);
292  return $?;
293}
294# /** hdfsCommand() **/
295
296# /** @function hdfsTest
297#  */
298sub hdfsTest
299{
300  my $command = shift(@_);
301  my $test_target = shift(@_);
302  my $result = &hdfsCommand('test -' . $command, @_);
303  return ($result == $test_target);
304}
305# /** hdfsTest() **/
306
307# /**
308#  */
309sub printUsage
310{
311  print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
312  exit;
313}
314# /** printUsage() **/
315
316# /**
317#  */
318sub recursiveCopy
319{
320  my ($src_dir, $hdfs_dir) = @_;
321  my $file_count = 0;
322  # - create the directory in HDFS
323  &hdfsCommand('mkdir', $hdfs_dir);
324  # - search $src_dir for files
325  opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
326  my @files = readdir(DH);
327  closedir(DH);
328  foreach my $file (@files)
329  {
330    # - skip dot prefix files
331    if ($file !~ /^\./)
332    {
333      my $src_path = $src_dir . '/' . $file;
334      # - recurse directories, remembering to extend HDFS dir too
335      if (-d $src_path)
336      {
337        my $new_hdfs_dir = $hdfs_dir . '/' . $file;
338        $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
339      }
340      # - and use 'put' to copy files
341      else
342      {
343        my $hdfs_path = $hdfs_dir . '/' . $file;
344        &hdfsCommand('put', $src_path, $hdfs_path);
345        $file_count++;
346      }
347    }
348  }
349  return $file_count;
350}
351# /** recursiveCopy() **/
352
353# /** @function shellCommand
354#  */
355sub shellCommand
356{
357  my $cmd = shift(@_);
358  my $output = '';
359  &debugPrint($cmd);
360  if (!$dry_run)
361  {
362    $output = `$cmd`;
363  }
364  return $output;
365}
366# /** shellCommand() **/
367
368# /** @function urlCat
369#  */
370sub urlCat
371{
372  my $url = join('/', @_);
373  return $url;
374}
375# /** urlCat() **/
376
377# /**
378#  */
379sub dirIsEmpty
380{
381  my $dir = shift(@_);
382  my @files;
383  if (-e $dir)
384  {
385    opendir(DIR, $dir) or die $!;
386    @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
387    closedir(DIR);
388  }
389  @files ? 0 : 1;
390}
391# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the browser.