source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27584

Last change on this file since 27584 was 27584, checked in by jmt12, 11 years ago

I wasn't doing -r when attempting to clear directories left in /tmp by video processing

  • Property svn:executable set to *
File size: 10.7 KB
RevLine 
[26949]1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
[27001]10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
[26949]12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
[27414]18my $use_thrift = 1;
[26949]19my $debug = 0;
20my $dry_run = 0;
21my $gsdl_home = $ENV{'GSDLHOME'};
[27001]22my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
[26949]23my $hadoop_exe = 'hadoop'; # you may add path
[27001]24my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
[27414]25my $hdfs_fs_prefix = 'HDThriftFS://';
[26949]26my $refresh_import = 0;
27my $username = `whoami`;
28chomp($username);
29
[27001]30`rocks > /dev/null 2>&1`;
31my $is_rocks_cluster = ($? == 0);
32
[26949]33# 1. Read and validate parameters
34if (defined $ARGV[0])
35{
36 $collection = $ARGV[0];
37}
[27414]38else
39{
40 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-disable_thrift] [-refresh_import] \n\n";
41}
42my $offset = 1;
43while (defined $ARGV[$offset])
44{
45 if ($ARGV[$offset] eq '-debug')
46 {
47 $debug = 1;
48 }
49 if ($ARGV[$offset] eq '-disable_thrift')
50 {
51 $use_thrift = 0;
52 }
53 if ($ARGV[$offset] eq '-dry_run')
54 {
55 $dry_run = 1;
56 }
57 if ($ARGV[$offset] eq '-refresh_import')
58 {
59 $refresh_import = 1;
60 }
61 $offset++;
62}
63
64if (!$use_thrift)
65{
66 $hdfs_fs_prefix = 'HDFSShell://';
67}
68
[26949]69my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
70my $gs_import_dir = $gs_collection_dir . '/import';
71if (!-d $gs_import_dir)
72{
73 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
74}
75my $gs_results_dir = $gs_collection_dir . '/results';
76if (!-d $gs_results_dir)
77{
78 mkdir($gs_results_dir, 0755);
79}
80$gs_results_dir .= '/' . time();
81if (!-d $gs_results_dir)
82{
83 mkdir($gs_results_dir, 0755);
84}
85# - directories within HDFS
[27495]86#my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
87my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
88#my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
89my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
[26949]90
91# 2. Copy the import directory into HDFS
92print " * Replicating import directory in HDFS...";
93# - check if import directory already exists
94my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
95if ($refresh_import || !$hdfs_import_exists)
96{
97 # - clear out the old import directory
98 if ($hdfs_import_exists)
99 {
100 &hdfsCommand('rmr', $hdfs_input_dir);
101 }
102 # - now recursively copy the contents of import directory into HDFS ensuring
103 # that relative paths are maintained
104 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
105 &debugPrint($file_count . " files 'putted'");
106 print "Done!\n";
107}
108else
109{
110 print "Already exists!\n";
111}
112# - clear out the archives regardless
[27530]113my $gs_archives_dir = $gs_collection_dir . '/archives';
114my $deleted_archives = 0;
[27001]115if (-e $gs_archives_dir)
116{
[27530]117 print " * Clearing existing archives directory for this collection... ";
[27001]118 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
[27530]119 $deleted_archives = 1;
[27001]120}
121mkdir($gs_archives_dir, 0755);
[26949]122if (&hdfsTest('d', 0, $hdfs_output_dir))
123{
[27530]124 if (!$deleted_archives)
125 {
126 print " * Clearing existing archives directory for this collection... ";
127 }
[26949]128 &hdfsCommand('rmr', $hdfs_output_dir);
[27530]129 $deleted_archives = 1;
[26949]130}
[27530]131if ($deleted_archives)
132{
133 print "Done!\n";
134}
135# - watch for cached directories for Media based collections
136my $gs_cached_dir = $gs_collection_dir . '/cached';
137if (-e $gs_cached_dir)
138{
139 print " * Clearing existing cached media directory for this collection... ";
140 &shellCommand('rm -rf "' . $gs_cached_dir . '"');
141 print "Done!\n";
142}
143
[27001]144# - clear out any old logs
[27530]145print " * Clearing existing logs for this collection... ";
146my $gs_logs_dir = $gs_collection_dir . '/logs';
147if (!&dirIsEmpty($gs_logs_dir))
148{
[27584]149 &shellCommand('rm -f ' . $gs_logs_dir . '/*.*');
[27530]150}
[27001]151if (!&dirIsEmpty('/tmp/greenstone'))
152{
[27584]153 &shellCommand('rm -f /tmp/greenstone/*.*');
154 &shellCommand('rm -rf /tmp/gsimport*');
[27001]155}
156if ($is_rocks_cluster)
157{
[27584]158 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
159 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
[27001]160}
[27126]161print "Done!\n";
[26949]162
[27126]163# - flush DNS cache too, so we are playing on a level field
[27530]164print " * Flushing disk cache... ";
[27126]165&shellCommand('flush_caches.pl');
[27414]166if ($is_rocks_cluster)
167{
168 &shellCommand('rocks run host "flush_caches.pl"');
169}
[27530]170print "Done!\n";
[27126]171
[26949]172# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
173# where we start the server now to ensure it lives on the head node
[27001]174my $server_host = '';
175my $server_port = '';
[26949]176my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
177my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
178my $server_prefix = '';
179if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
180{
181 $server_prefix = uc($1);
182 print " * Starting " . $server_prefix . "Server... ";
183 # - start the server on the head node and retrieve the host and port from
184 # the output
185 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
186 my $launcher_output = &shellCommand($launcher_command);
187 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
188 {
189 $server_host = $1;
190 $server_port = $2;
191 print "running on " . $server_host . ":" . $server_port . "\n";
192 }
193 else
194 {
195 print "Failed!\n";
196 exit;
197 }
[27001]198 # - use the client tool to add ourselves as a listener
[26949]199 print " * Registering as listener... ";
200 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
201 &shellCommand($client_command);
202 print "Done!\n";
203}
204else
205{
206 print "Error! True Hadoop processing is only available when Greenstone is\n";
207 print " configured to use either GDBMServer or TDBServer.\n";
208 exit;
209}
210
211# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
212# and allow the FileInputFormat to split it up into files to be processed
213# in Greenstone. This works for collections with one file per document, like
214# Lorem and ReplayMe, but might not work well with multiple file documents
215# such as the Demo collection
216print " * Running import using Hadoop...";
217my $hadoop_log = $gs_results_dir . '/hadoop.log';
[27550]218&shellCommand('echo "host:$HDFSHOST" > ' . $hadoop_log);
219my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' . $hdfs_input_dir . '" "' . $hdfs_output_dir . '" >> ' . $hadoop_log . ' 2>&1';
[26949]220&shellCommand($hadoop_command);
221print "Done!\n";
222
223# 5. If we ran *Server infodbs, we now need to shut them down
224if ($server_prefix ne '')
225{
226 print " * Deregistering as listener and shutting down... ";
227 # - deregister as a listener
228 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
229 &shellCommand($client_command1);
230 # - send quit command
231 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
232 &shellCommand($client_command2);
233 print "Done!\n";
234}
235
236
237# 6. Gather logs
238print " * Gathering logs from compute nodes... ";
239# - local files
[27001]240if (!&dirIsEmpty('/tmp/greenstone'))
241{
242 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
243}
[26949]244if (-d $gs_collection_dir . '/logs')
245{
246 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
247}
248# - remote files
[27001]249if ($is_rocks_cluster)
[26949]250{
[27001]251 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
[26949]252&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
253}
254print "Done!\n";
[27058]255# - generate data locality report
256&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
[26949]257
258# 7. Done - clean up
259print " * Cleaning up temporary files... ";
260&shellCommand('rm -rf /tmp/greenstone');
[27126]261&shellCommand('rm -rf /tmp/gsimport*');
[27001]262if ($is_rocks_cluster)
[26949]263{
264 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
[27126]265 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
[26949]266}
267print "Done!\n";
268print "Complete!\n\n";
269
270exit;
271
272# /** @function debugPrint
273# */
274sub debugPrint
275{
276 my $msg = shift(@_);
277 if ($debug)
278 {
279 print "[Debug] " . $msg . "\n";
280 }
281}
282# /** debugPrint() **/
283
284# /** @function hdfsCommand
285# */
286sub hdfsCommand
287{
288 my $command = shift(@_);
289 my $paths = '"' . join('" "', @_) . '"';
290 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
291 &shellCommand($hdfs_command);
292 return $?;
293}
294# /** hdfsCommand() **/
295
296# /** @function hdfsTest
297# */
298sub hdfsTest
299{
300 my $command = shift(@_);
301 my $test_target = shift(@_);
302 my $result = &hdfsCommand('test -' . $command, @_);
303 return ($result == $test_target);
304}
305# /** hdfsTest() **/
306
307# /**
308# */
309sub printUsage
310{
311 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
312 exit;
313}
314# /** printUsage() **/
315
316# /**
317# */
318sub recursiveCopy
319{
320 my ($src_dir, $hdfs_dir) = @_;
321 my $file_count = 0;
322 # - create the directory in HDFS
323 &hdfsCommand('mkdir', $hdfs_dir);
324 # - search $src_dir for files
325 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
326 my @files = readdir(DH);
327 closedir(DH);
328 foreach my $file (@files)
329 {
330 # - skip dot prefix files
331 if ($file !~ /^\./)
332 {
333 my $src_path = $src_dir . '/' . $file;
334 # - recurse directories, remembering to extend HDFS dir too
335 if (-d $src_path)
336 {
337 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
338 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
339 }
340 # - and use 'put' to copy files
341 else
342 {
343 my $hdfs_path = $hdfs_dir . '/' . $file;
344 &hdfsCommand('put', $src_path, $hdfs_path);
345 $file_count++;
346 }
347 }
348 }
349 return $file_count;
350}
351# /** recursiveCopy() **/
352
353# /** @function shellCommand
354# */
355sub shellCommand
356{
357 my $cmd = shift(@_);
358 my $output = '';
359 &debugPrint($cmd);
360 if (!$dry_run)
361 {
362 $output = `$cmd`;
363 }
364 return $output;
365}
366# /** shellCommand() **/
367
368# /** @function urlCat
369# */
370sub urlCat
371{
372 my $url = join('/', @_);
373 return $url;
374}
375# /** urlCat() **/
[27001]376
377# /**
378# */
379sub dirIsEmpty
380{
381 my $dir = shift(@_);
382 my @files;
383 if (-e $dir)
384 {
385 opendir(DIR, $dir) or die $!;
386 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
387 closedir(DIR);
388 }
389 @files ? 0 : 1;
390}
391# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.