#!/usr/bin/perl use strict; use warnings; # Requires setup.bash to have been sourced BEGIN { die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'}; die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'}; die "HDFS HOST not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'}; die "HDFS PORT not set (set in /ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'}; } print "===== Greenstone Import using Hadoop =====\n"; # 0. Init my $collection = 'test'; my $debug = 0; my $dry_run = 0; my $gsdl_home = $ENV{'GSDLHOME'}; my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'}; my $hadoop_exe = 'hadoop'; # you may add path my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs my $hdfs_fs_prefix = 'hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}; my $refresh_import = 0; my $username = `whoami`; chomp($username); `rocks > /dev/null 2>&1`; my $is_rocks_cluster = ($? == 0); # 1. Read and validate parameters if (defined $ARGV[0]) { $collection = $ARGV[0]; } my $gs_collection_dir = $gsdl_home . '/collect/' . $collection; my $gs_import_dir = $gs_collection_dir . '/import'; if (!-d $gs_import_dir) { die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n"); } my $gs_results_dir = $gs_collection_dir . '/results'; if (!-d $gs_results_dir) { mkdir($gs_results_dir, 0755); } $gs_results_dir .= '/' . time(); if (!-d $gs_results_dir) { mkdir($gs_results_dir, 0755); } my $gs_archives_dir = $gs_collection_dir . '/archives'; # - directories within HDFS my $hdfs_input_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'import'); my $hdfs_output_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'archives'); # 2. Copy the import directory into HDFS print " * Replicating import directory in HDFS..."; # - check if import directory already exists my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir); if ($refresh_import || !$hdfs_import_exists) { # - clear out the old import directory if ($hdfs_import_exists) { &hdfsCommand('rmr', $hdfs_input_dir); } # - now recursively copy the contents of import directory into HDFS ensuring # that relative paths are maintained my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir); &debugPrint($file_count . " files 'putted'"); print "Done!\n"; } else { print "Already exists!\n"; } # - clear out the archives regardless if (-e $gs_archives_dir) { &shellCommand('rm -rf "' . $gs_archives_dir . '"'); } mkdir($gs_archives_dir, 0755); if (&hdfsTest('d', 0, $hdfs_output_dir)) { print " * Clearing existing archives directory for this collection... "; &hdfsCommand('rmr', $hdfs_output_dir); print "Done!\n"; } # - clear out any old logs if (!&dirIsEmpty('/tmp/greenstone')) { &shellCommand('rm /tmp/greenstone/*.*'); } if ($is_rocks_cluster) { &shellCommand('rocks run host "rm /tmp/greenstone/*.*"'); } # 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer) # where we start the server now to ensure it lives on the head node my $server_host = ''; my $server_port = ''; my $configuration_path = $gs_collection_dir . '/etc/collect.cfg'; my $infodbtype = `grep -P "^infodbtype" $configuration_path`; my $server_prefix = ''; if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i) { $server_prefix = uc($1); print " * Starting " . $server_prefix . "Server... "; # - start the server on the head node and retrieve the host and port from # the output my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection; my $launcher_output = &shellCommand($launcher_command); if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/) { $server_host = $1; $server_port = $2; print "running on " . $server_host . ":" . $server_port . "\n"; } else { print "Failed!\n"; exit; } # - use the client tool to add ourselves as a listener print " * Registering as listener... "; my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\""; &shellCommand($client_command); print "Done!\n"; } else { print "Error! True Hadoop processing is only available when Greenstone is\n"; print " configured to use either GDBMServer or TDBServer.\n"; exit; } # 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input # and allow the FileInputFormat to split it up into files to be processed # in Greenstone. This works for collections with one file per document, like # Lorem and ReplayMe, but might not work well with multiple file documents # such as the Demo collection print " * Running import using Hadoop..."; my $hadoop_log = $gs_results_dir . '/hadoop.log'; my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $ENV{'HDFSHOST'} . '" ' . $ENV{'HDFSPORT'} . ' "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . " " . $hdfs_input_dir . " " . $hdfs_output_dir . " > " . $hadoop_log . " 2>&1"; &shellCommand($hadoop_command); print "Done!\n"; # 5. If we ran *Server infodbs, we now need to shut them down if ($server_prefix ne '') { print " * Deregistering as listener and shutting down... "; # - deregister as a listener my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\""; &shellCommand($client_command1); # - send quit command my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\""; &shellCommand($client_command2); print "Done!\n"; } # 6. Gather logs print " * Gathering logs from compute nodes... "; # - local files if (!&dirIsEmpty('/tmp/greenstone')) { &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir); } if (-d $gs_collection_dir . '/logs') { &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir); } # - remote files if ($is_rocks_cluster) { &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"'); &shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"'); } print "Done!\n"; # - generate data locality report &shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"'); # 7. Done - clean up print " * Cleaning up temporary files... "; &shellCommand('rm -rf /tmp/greenstone'); if ($is_rocks_cluster) { &shellCommand('rocks run host "rm -rf /tmp/greenstone"'); } print "Done!\n"; print "Complete!\n\n"; exit; # /** @function debugPrint # */ sub debugPrint { my $msg = shift(@_); if ($debug) { print "[Debug] " . $msg . "\n"; } } # /** debugPrint() **/ # /** @function hdfsCommand # */ sub hdfsCommand { my $command = shift(@_); my $paths = '"' . join('" "', @_) . '"'; my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1'; &shellCommand($hdfs_command); return $?; } # /** hdfsCommand() **/ # /** @function hdfsTest # */ sub hdfsTest { my $command = shift(@_); my $test_target = shift(@_); my $result = &hdfsCommand('test -' . $command, @_); return ($result == $test_target); } # /** hdfsTest() **/ # /** # */ sub printUsage { print "usage: hadoop_import.pl [] [<\"removeold\"|\"keepold\">]\n"; exit; } # /** printUsage() **/ # /** # */ sub recursiveCopy { my ($src_dir, $hdfs_dir) = @_; my $file_count = 0; # - create the directory in HDFS &hdfsCommand('mkdir', $hdfs_dir); # - search $src_dir for files opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir); my @files = readdir(DH); closedir(DH); foreach my $file (@files) { # - skip dot prefix files if ($file !~ /^\./) { my $src_path = $src_dir . '/' . $file; # - recurse directories, remembering to extend HDFS dir too if (-d $src_path) { my $new_hdfs_dir = $hdfs_dir . '/' . $file; $file_count += &recursiveCopy($src_path, $new_hdfs_dir); } # - and use 'put' to copy files else { my $hdfs_path = $hdfs_dir . '/' . $file; &hdfsCommand('put', $src_path, $hdfs_path); $file_count++; } } } return $file_count; } # /** recursiveCopy() **/ # /** @function shellCommand # */ sub shellCommand { my $cmd = shift(@_); my $output = ''; &debugPrint($cmd); if (!$dry_run) { $output = `$cmd`; } return $output; } # /** shellCommand() **/ # /** @function urlCat # */ sub urlCat { my $url = join('/', @_); return $url; } # /** urlCat() **/ # /** # */ sub dirIsEmpty { my $dir = shift(@_); my @files; if (-e $dir) { opendir(DIR, $dir) or die $!; @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR); closedir(DIR); } @files ? 0 : 1; } # /** dirIsEmpty() **/