source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27550

Last change on this file since 27550 was 27550, checked in by jmt12, 11 years ago

Ensure the hostname is added to the Hadoop logs so we can identify the compute node (maybe)

  • Property svn:executable set to *
File size: 10.7 KB
RevLine 
[26949]1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
[27001]10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
[26949]12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
[27414]18my $use_thrift = 1;
[26949]19my $debug = 0;
20my $dry_run = 0;
21my $gsdl_home = $ENV{'GSDLHOME'};
[27001]22my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
[26949]23my $hadoop_exe = 'hadoop'; # you may add path
[27001]24my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
[27414]25my $hdfs_fs_prefix = 'HDThriftFS://';
[26949]26my $refresh_import = 0;
27my $username = `whoami`;
28chomp($username);
29
[27001]30`rocks > /dev/null 2>&1`;
31my $is_rocks_cluster = ($? == 0);
32
[26949]33# 1. Read and validate parameters
34if (defined $ARGV[0])
35{
36 $collection = $ARGV[0];
37}
[27414]38else
39{
40 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-disable_thrift] [-refresh_import] \n\n";
41}
42my $offset = 1;
43while (defined $ARGV[$offset])
44{
45 if ($ARGV[$offset] eq '-debug')
46 {
47 $debug = 1;
48 }
49 if ($ARGV[$offset] eq '-disable_thrift')
50 {
51 $use_thrift = 0;
52 }
53 if ($ARGV[$offset] eq '-dry_run')
54 {
55 $dry_run = 1;
56 }
57 if ($ARGV[$offset] eq '-refresh_import')
58 {
59 $refresh_import = 1;
60 }
61 $offset++;
62}
63
64if (!$use_thrift)
65{
66 $hdfs_fs_prefix = 'HDFSShell://';
67}
68
[26949]69my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
70my $gs_import_dir = $gs_collection_dir . '/import';
71if (!-d $gs_import_dir)
72{
73 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
74}
75my $gs_results_dir = $gs_collection_dir . '/results';
76if (!-d $gs_results_dir)
77{
78 mkdir($gs_results_dir, 0755);
79}
80$gs_results_dir .= '/' . time();
81if (!-d $gs_results_dir)
82{
83 mkdir($gs_results_dir, 0755);
84}
85# - directories within HDFS
[27495]86#my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
87my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
88#my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
89my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
[26949]90
91# 2. Copy the import directory into HDFS
92print " * Replicating import directory in HDFS...";
93# - check if import directory already exists
94my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
95if ($refresh_import || !$hdfs_import_exists)
96{
97 # - clear out the old import directory
98 if ($hdfs_import_exists)
99 {
100 &hdfsCommand('rmr', $hdfs_input_dir);
101 }
102 # - now recursively copy the contents of import directory into HDFS ensuring
103 # that relative paths are maintained
104 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
105 &debugPrint($file_count . " files 'putted'");
106 print "Done!\n";
107}
108else
109{
110 print "Already exists!\n";
111}
112# - clear out the archives regardless
[27530]113my $gs_archives_dir = $gs_collection_dir . '/archives';
114my $deleted_archives = 0;
[27001]115if (-e $gs_archives_dir)
116{
[27530]117 print " * Clearing existing archives directory for this collection... ";
[27001]118 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
[27530]119 $deleted_archives = 1;
[27001]120}
121mkdir($gs_archives_dir, 0755);
[26949]122if (&hdfsTest('d', 0, $hdfs_output_dir))
123{
[27530]124 if (!$deleted_archives)
125 {
126 print " * Clearing existing archives directory for this collection... ";
127 }
[26949]128 &hdfsCommand('rmr', $hdfs_output_dir);
[27530]129 $deleted_archives = 1;
[26949]130}
[27530]131if ($deleted_archives)
132{
133 print "Done!\n";
134}
135# - watch for cached directories for Media based collections
136my $gs_cached_dir = $gs_collection_dir . '/cached';
137if (-e $gs_cached_dir)
138{
139 print " * Clearing existing cached media directory for this collection... ";
140 &shellCommand('rm -rf "' . $gs_cached_dir . '"');
141 print "Done!\n";
142}
143
[27001]144# - clear out any old logs
[27530]145print " * Clearing existing logs for this collection... ";
146my $gs_logs_dir = $gs_collection_dir . '/logs';
147if (!&dirIsEmpty($gs_logs_dir))
148{
149 &shellCommand('rm ' . $gs_logs_dir . '/*.*');
150}
[27001]151if (!&dirIsEmpty('/tmp/greenstone'))
152{
153 &shellCommand('rm /tmp/greenstone/*.*');
[27126]154 &shellCommand('rm /tmp/gsimport*');
[27001]155}
156if ($is_rocks_cluster)
157{
158 &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
[27126]159 &shellCommand('rocks run host "rm /tmp/gsimport*"');
[27001]160}
[27126]161print "Done!\n";
[26949]162
[27126]163# - flush DNS cache too, so we are playing on a level field
[27530]164print " * Flushing disk cache... ";
[27126]165&shellCommand('flush_caches.pl');
[27414]166if ($is_rocks_cluster)
167{
168 &shellCommand('rocks run host "flush_caches.pl"');
169}
[27530]170print "Done!\n";
[27126]171
[26949]172# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
173# where we start the server now to ensure it lives on the head node
[27001]174my $server_host = '';
175my $server_port = '';
[26949]176my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
177my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
178my $server_prefix = '';
179if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
180{
181 $server_prefix = uc($1);
182 print " * Starting " . $server_prefix . "Server... ";
183 # - start the server on the head node and retrieve the host and port from
184 # the output
185 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
186 my $launcher_output = &shellCommand($launcher_command);
187 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
188 {
189 $server_host = $1;
190 $server_port = $2;
191 print "running on " . $server_host . ":" . $server_port . "\n";
192 }
193 else
194 {
195 print "Failed!\n";
196 exit;
197 }
[27001]198 # - use the client tool to add ourselves as a listener
[26949]199 print " * Registering as listener... ";
200 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
201 &shellCommand($client_command);
202 print "Done!\n";
203}
204else
205{
206 print "Error! True Hadoop processing is only available when Greenstone is\n";
207 print " configured to use either GDBMServer or TDBServer.\n";
208 exit;
209}
210
211# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
212# and allow the FileInputFormat to split it up into files to be processed
213# in Greenstone. This works for collections with one file per document, like
214# Lorem and ReplayMe, but might not work well with multiple file documents
215# such as the Demo collection
216print " * Running import using Hadoop...";
217my $hadoop_log = $gs_results_dir . '/hadoop.log';
[27550]218&shellCommand('echo "host:$HDFSHOST" > ' . $hadoop_log);
219my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' . $hdfs_input_dir . '" "' . $hdfs_output_dir . '" >> ' . $hadoop_log . ' 2>&1';
[26949]220&shellCommand($hadoop_command);
221print "Done!\n";
222
223# 5. If we ran *Server infodbs, we now need to shut them down
224if ($server_prefix ne '')
225{
226 print " * Deregistering as listener and shutting down... ";
227 # - deregister as a listener
228 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
229 &shellCommand($client_command1);
230 # - send quit command
231 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
232 &shellCommand($client_command2);
233 print "Done!\n";
234}
235
236
237# 6. Gather logs
238print " * Gathering logs from compute nodes... ";
239# - local files
[27001]240if (!&dirIsEmpty('/tmp/greenstone'))
241{
242 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
243}
[26949]244if (-d $gs_collection_dir . '/logs')
245{
246 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
247}
248# - remote files
[27001]249if ($is_rocks_cluster)
[26949]250{
[27001]251 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
[26949]252&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
253}
254print "Done!\n";
[27058]255# - generate data locality report
256&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
[26949]257
258# 7. Done - clean up
259print " * Cleaning up temporary files... ";
260&shellCommand('rm -rf /tmp/greenstone');
[27126]261&shellCommand('rm -rf /tmp/gsimport*');
[27001]262if ($is_rocks_cluster)
[26949]263{
264 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
[27126]265 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
[26949]266}
267print "Done!\n";
268print "Complete!\n\n";
269
270exit;
271
272# /** @function debugPrint
273# */
274sub debugPrint
275{
276 my $msg = shift(@_);
277 if ($debug)
278 {
279 print "[Debug] " . $msg . "\n";
280 }
281}
282# /** debugPrint() **/
283
284# /** @function hdfsCommand
285# */
286sub hdfsCommand
287{
288 my $command = shift(@_);
289 my $paths = '"' . join('" "', @_) . '"';
290 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
291 &shellCommand($hdfs_command);
292 return $?;
293}
294# /** hdfsCommand() **/
295
296# /** @function hdfsTest
297# */
298sub hdfsTest
299{
300 my $command = shift(@_);
301 my $test_target = shift(@_);
302 my $result = &hdfsCommand('test -' . $command, @_);
303 return ($result == $test_target);
304}
305# /** hdfsTest() **/
306
307# /**
308# */
309sub printUsage
310{
311 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
312 exit;
313}
314# /** printUsage() **/
315
316# /**
317# */
318sub recursiveCopy
319{
320 my ($src_dir, $hdfs_dir) = @_;
321 my $file_count = 0;
322 # - create the directory in HDFS
323 &hdfsCommand('mkdir', $hdfs_dir);
324 # - search $src_dir for files
325 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
326 my @files = readdir(DH);
327 closedir(DH);
328 foreach my $file (@files)
329 {
330 # - skip dot prefix files
331 if ($file !~ /^\./)
332 {
333 my $src_path = $src_dir . '/' . $file;
334 # - recurse directories, remembering to extend HDFS dir too
335 if (-d $src_path)
336 {
337 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
338 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
339 }
340 # - and use 'put' to copy files
341 else
342 {
343 my $hdfs_path = $hdfs_dir . '/' . $file;
344 &hdfsCommand('put', $src_path, $hdfs_path);
345 $file_count++;
346 }
347 }
348 }
349 return $file_count;
350}
351# /** recursiveCopy() **/
352
353# /** @function shellCommand
354# */
355sub shellCommand
356{
357 my $cmd = shift(@_);
358 my $output = '';
359 &debugPrint($cmd);
360 if (!$dry_run)
361 {
362 $output = `$cmd`;
363 }
364 return $output;
365}
366# /** shellCommand() **/
367
368# /** @function urlCat
369# */
370sub urlCat
371{
372 my $url = join('/', @_);
373 return $url;
374}
375# /** urlCat() **/
[27001]376
377# /**
378# */
379sub dirIsEmpty
380{
381 my $dir = shift(@_);
382 my @files;
383 if (-e $dir)
384 {
385 opendir(DIR, $dir) or die $!;
386 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
387 closedir(DIR);
388 }
389 @files ? 0 : 1;
390}
391# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.