source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27644

Last change on this file since 27644 was 27644, checked in by jmt12, 11 years ago

Extended to support HDFS-access via NFS. This applies to both the call to Hadoop (which needed one extra argument stating the NFS located archives dir) but also to the import directory setup stuff (much quicker to write to HDFS via NFS rather than Java application calls)

  • Property svn:executable set to *
File size: 14.4 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless (defined $ENV{'GSDLHOME'} && $ENV{'GSDLHOME'} ne '');
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "GEXTPARALLELBUILDING not set\n" unless defined $ENV{'GEXTPARALLELBUILDING'};
11 die "GEXTPARALLELBUILDING_INSTALLED not set\n" unless defined $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
12 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
13 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
14}
15
16print "===== Greenstone Import using Hadoop =====\n";
17
18# 0. Init
19my $collection = 'test';
20my $use_thrift = 1;
21my $start_thrift = 0;
22my $debug = 0;
23my $dry_run = 0;
24my $flush_diskcache = 0;
25my $use_nfs = 0;
26
27my $gsdl_home = $ENV{'GSDLHOME'};
28my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
29my $hadoop_exe = 'hadoop'; # you may add path
30my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
31my $hdfs_fs_prefix = 'HDThriftFS://';
32my $refresh_import = 0;
33my $username = `whoami`;
34chomp($username);
35
36`rocks > /dev/null 2>&1`;
37my $is_rocks_cluster = ($? == 0);
38
39# 1. Read and validate parameters
40if (defined $ARGV[0] && $ARGV[0] =~ /^[a-z0-9]+$/i)
41{
42 $collection = $ARGV[0];
43}
44else
45{
46 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-start_thrift] [-disable_thrift] [-refresh_import] [-flush_diskcache] [-use_nfs]\n\n";
47 exit;
48}
49my $offset = 1;
50while (defined $ARGV[$offset])
51{
52 if ($ARGV[$offset] eq '-debug')
53 {
54 $debug = 1;
55 }
56 if ($ARGV[$offset] eq '-disable_thrift')
57 {
58 $use_thrift = 0;
59 }
60 if ($ARGV[$offset] eq '-dry_run')
61 {
62 $dry_run = 1;
63 }
64 if ($ARGV[$offset] eq '-refresh_import')
65 {
66 $refresh_import = 1;
67 }
68 if ($ARGV[$offset] eq '-flush_diskcache')
69 {
70 $flush_diskcache = 1;
71 }
72 if ($ARGV[$offset] eq '-start_thrift')
73 {
74 $start_thrift = 1;
75 }
76 if ($ARGV[$offset] eq '-use_nfs')
77 {
78 $use_nfs = 1;
79 }
80 $offset++;
81}
82
83if (!$use_thrift)
84{
85 $hdfs_fs_prefix = 'HDFSShell://';
86}
87if ($use_nfs)
88{
89 $hdfs_fs_prefix = '/hdfs';
90}
91
92my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
93my $gs_import_dir = $gs_collection_dir . '/import';
94if (!-d $gs_import_dir)
95{
96 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
97}
98my $gs_results_dir = $gs_collection_dir . '/results';
99if (!-d $gs_results_dir)
100{
101 mkdir($gs_results_dir, 0755);
102}
103$gs_results_dir .= '/' . time();
104if (!-d $gs_results_dir)
105{
106 mkdir($gs_results_dir, 0755);
107}
108# - directories within HDFS
109my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
110print "HDFS Import Directory: " . $hdfs_input_dir . "\n";
111my $nfs_input_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'import');
112if ($use_nfs)
113{
114 print "=> NFS Import Directory: " . $nfs_input_dir . "\n";
115}
116my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
117print "HDFS Archives Directory: " . $hdfs_output_dir . "\n";
118my $nfs_output_dir = &urlCat('/hdfs', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
119if ($use_nfs)
120{
121 print "=> NFS Archives Directory: " . $nfs_output_dir . "\n";
122}
123
124# 2. Copy the import directory into HDFS
125print " * Replicating import directory in HDFS...";
126# - check if import directory already exists
127my $hdfs_import_exists = 0;
128if ($use_nfs)
129{
130 if (-d $nfs_input_dir)
131 {
132 $hdfs_import_exists = 1;
133 }
134}
135else
136{
137 $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
138}
139if ($refresh_import || !$hdfs_import_exists)
140{
141 # - clear out the old import directory
142 if ($hdfs_import_exists)
143 {
144 if ($use_nfs)
145 {
146 &recursiveDelete($nfs_input_dir, '/hdfs');
147 }
148 else
149 {
150 &hdfsCommand('rmr', $hdfs_input_dir);
151 }
152 }
153 # - now recursively copy the contents of import directory into HDFS ensuring
154 # that relative paths are maintained
155 my $file_count = 0;
156 if ($use_nfs)
157 {
158 $file_count = &recursiveCopy($gs_import_dir, $nfs_input_dir);
159 }
160 else
161 {
162 $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
163 }
164 &debugPrint($file_count . " files 'putted'");
165 print "Done!\n";
166}
167else
168{
169 print "Already exists!\n";
170}
171
172# - clear out the archives regardless
173my $gs_archives_dir = $gs_collection_dir . '/archives';
174my $deleted_archives = 0;
175if (-e $gs_archives_dir)
176{
177 print " * Clearing existing archives directory for this collection... ";
178 &recursiveDelete($gs_archives_dir, $gsdl_home);
179 $deleted_archives = 1;
180}
181mkdir($gs_archives_dir, 0755);
182my $hdfs_archives_exists = 0;
183if ($use_nfs)
184{
185 if (-d $nfs_output_dir)
186 {
187 $hdfs_archives_exists = 1;
188 }
189}
190else
191{
192 $hdfs_archives_exists = &hdfsTest('d', 0, $hdfs_output_dir)
193}
194if ($hdfs_archives_exists)
195{
196 if (!$deleted_archives)
197 {
198 print " * Clearing existing archives directory for this collection... ";
199 }
200 if ($use_nfs)
201 {
202 &recursiveDelete($nfs_output_dir, '/hdfs');
203 }
204 else
205 {
206 &hdfsCommand('rmr', $hdfs_output_dir);
207 }
208 $deleted_archives = 1;
209}
210if ($deleted_archives)
211{
212 print "Done!\n";
213}
214
215# - watch for cached directories for Media based collections
216my $gs_cached_dir = $gs_collection_dir . '/cached';
217if (-e $gs_cached_dir)
218{
219 print " * Clearing existing cached media directory for this collection... ";
220 &recursiveDelete($gs_cached_dir, $gsdl_home);
221 print "Done!\n";
222}
223
224# - clear out any old logs
225print " * Clearing existing logs for this collection... ";
226my $gs_logs_dir = $gs_collection_dir . '/logs';
227if (!&dirIsEmpty($gs_logs_dir))
228{
229 &recursiveDelete($gs_logs_dir . '/*.*', $gsdl_home);
230}
231if (!&dirIsEmpty('/tmp/greenstone'))
232{
233 &shellCommand('rm -f /tmp/greenstone/*.*');
234 &shellCommand('rm -rf /tmp/gsimport*');
235 &shellCommand('rm -rf /tmp/thrift');
236}
237if ($is_rocks_cluster)
238{
239 &shellCommand('rocks run host "rm -f /tmp/greenstone/*.*"');
240 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
241 &shellCommand('rocks run host "rm -rf /tmp/thrift"');
242}
243print "Done!\n";
244
245# - flush DNS cache too, so we are playing on a level field
246if ($flush_diskcache)
247{
248 print " * Flushing disk cache... ";
249 &shellCommand('flush_caches.pl');
250 if ($is_rocks_cluster)
251 {
252 &shellCommand('rocks run host "flush_caches.pl"');
253 }
254 print "Done!\n";
255}
256
257# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
258# where we start the server now to ensure it lives on the head node
259my $server_host = '';
260my $server_port = '';
261my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
262my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
263my $server_prefix = '';
264if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
265{
266 $server_prefix = uc($1);
267 print " * Starting " . $server_prefix . "Server... ";
268 # - start the server on the head node and retrieve the host and port from
269 # the output
270 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
271 my $launcher_output = &shellCommand($launcher_command);
272 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
273 {
274 $server_host = $1;
275 $server_port = $2;
276 print "running on " . $server_host . ":" . $server_port . "\n";
277 }
278 else
279 {
280 print "Failed!\n";
281 exit;
282 }
283 # - use the client tool to add ourselves as a listener
284 print " * Registering as listener... ";
285 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
286 &shellCommand($client_command);
287 print "Done!\n";
288}
289else
290{
291 print "Error! True Hadoop processing is only available when Greenstone is\n";
292 print " configured to use either GDBMServer or TDBServer.\n";
293 exit;
294}
295
296# 3.5 Start up the thrift server(s) if we've been asked to
297if ($start_thrift)
298{
299 if ($is_rocks_cluster)
300 {
301 print " * Starting Thrift Servers (on compute nodes)... ";
302 print "[DEBUG]\n" . &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh start"') . "\n\n";
303 }
304 # single server
305 else
306 {
307 print " * Starting Thrift Server... ";
308 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
309 }
310 print "Done!\n";
311}
312
313my $actual_archives_dir;
314if ($use_nfs)
315{
316 $actual_archives_dir = $nfs_output_dir;
317}
318else
319{
320 $actual_archives_dir = $hdfs_output_dir;
321 $actual_archives_dir =~ s/hdfs:\/\//$hdfs_fs_prefix/;
322}
323
324# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
325# and allow the FileInputFormat to split it up into files to be processed
326# in Greenstone. This works for collections with one file per document, like
327# Lorem and ReplayMe, but might not work well with multiple file documents
328# such as the Demo collection
329print " * Running import using Hadoop...";
330my $hadoop_log = $gs_results_dir . '/hadoop.log';
331&shellCommand('echo "host:' . $ENV{'HDFSHOST'} . '" > ' . $hadoop_log);
332my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest ';
333$hadoop_command .= '"' . $gsdl_home . '" '; # Greenstone's home dir
334$hadoop_command .= '"' . $ENV{'HADOOP_PREFIX'} . '" '; # Hadoop's home dir
335$hadoop_command .= $collection . ' '; # The collection name
336$hadoop_command .= '"' . $actual_archives_dir . '" '; # Collection archive dir
337$hadoop_command .= '"' . $hdfs_fs_prefix . '" '; # Prefix for talking to HDFS (driver)
338$hadoop_command .= '"' . $hdfs_input_dir . '" '; # HDFS in
339$hadoop_command .= '"' . $hdfs_output_dir . '" '; # HDFS out
340$hadoop_command .= ' >> ' . $hadoop_log . ' 2>&1'; # Redirect to log
341&shellCommand($hadoop_command);
342print "Done!\n";
343
344# 5. If we ran *Server infodbs, we now need to shut them down
345if ($server_prefix ne '')
346{
347 print " * Deregistering as listener and shutting down... ";
348 # - deregister as a listener
349 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
350 &shellCommand($client_command1);
351 # - send quit command
352 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
353 &shellCommand($client_command2);
354 print "Done!\n";
355}
356
357# 5.5 We started them - so we better stop the thrift servers too
358# 3.5 Start up the thrift server(s) if we've been asked to
359if ($start_thrift)
360{
361 if ($is_rocks_cluster)
362 {
363 print " * Stopping Thrift Servers (on compute nodes)... ";
364 &shellCommand('rocks run host "cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && ./thriftctl.sh stop"');
365 }
366 # single server
367 else
368 {
369 print " * Stoping Thrift Server... ";
370 &shellCommand('cd ' . $ENV{'GEXTPARALLELBUILDING'} . '/packages/ThriftFS-0.9.0/bin && thriftctl.sh start');
371 }
372 print "Done!\n";
373}
374
375# 6. Gather logs
376print " * Gathering logs from compute nodes... ";
377# - local files
378if (!&dirIsEmpty('/tmp/greenstone'))
379{
380 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
381}
382if (-d $gs_collection_dir . '/logs')
383{
384 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
385}
386# - remote files
387if ($is_rocks_cluster)
388{
389 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
390&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
391}
392print "Done!\n";
393# - generate data locality report
394&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
395
396# 7. Done - clean up
397print " * Cleaning up temporary files... ";
398&shellCommand('rm -rf /tmp/greenstone');
399&shellCommand('rm -rf /tmp/gsimport*');
400if ($is_rocks_cluster)
401{
402 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
403 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
404}
405print "Done!\n";
406print "Complete!\n\n";
407
408exit;
409
410# /** @function debugPrint
411# */
412sub debugPrint
413{
414 my $msg = shift(@_);
415 if ($debug)
416 {
417 print "[Debug] " . $msg . "\n";
418 }
419}
420# /** debugPrint() **/
421
422# /** @function hdfsCommand
423# */
424sub hdfsCommand
425{
426 my $command = shift(@_);
427 my $paths = '"' . join('" "', @_) . '"';
428 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
429 &shellCommand($hdfs_command);
430 return $?;
431}
432# /** hdfsCommand() **/
433
434# /** @function hdfsTest
435# */
436sub hdfsTest
437{
438 my $command = shift(@_);
439 my $test_target = shift(@_);
440 my $result = &hdfsCommand('test -' . $command, @_);
441 return ($result == $test_target);
442}
443# /** hdfsTest() **/
444
445# /**
446# */
447sub printUsage
448{
449 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
450 exit;
451}
452# /** printUsage() **/
453
454
455## @function recursiveCopy()
456#
457sub recursiveCopy
458{
459 my ($src_dir, $hdfs_dir) = @_;
460 my $file_count = 0;
461 # - create the directory in HDFS
462 if ($use_nfs)
463 {
464 &shellCommand('mkdir "' . $hdfs_dir . '"');
465 }
466 else
467 {
468 &hdfsCommand('mkdir', $hdfs_dir);
469 }
470 # - search $src_dir for files
471 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
472 my @files = readdir(DH);
473 closedir(DH);
474 foreach my $file (@files)
475 {
476 # - skip dot prefix files
477 if ($file !~ /^\./)
478 {
479 my $src_path = $src_dir . '/' . $file;
480 # - recurse directories, remembering to extend HDFS dir too
481 if (-d $src_path)
482 {
483 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
484 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
485 }
486 # - and use 'put' to copy files
487 else
488 {
489 my $hdfs_path = $hdfs_dir . '/' . $file;
490 if ($use_nfs)
491 {
492 &shellCommand('cp "' . $src_path . '" "' . $hdfs_path . '"');
493 }
494 else
495 {
496 &hdfsCommand('put', $src_path, $hdfs_path);
497 }
498 $file_count++;
499 }
500 }
501 }
502 return $file_count;
503}
504## recursiveCopy() ##
505
506
507# /** @function shellCommand
508# */
509sub shellCommand
510{
511 my $cmd = shift(@_);
512 my $output = '';
513 &debugPrint($cmd);
514 if (!$dry_run)
515 {
516 $output = `$cmd`;
517 }
518 return $output;
519}
520# /** shellCommand() **/
521
522# /** @function urlCat
523# */
524sub urlCat
525{
526 my $url = join('/', @_);
527 return $url;
528}
529# /** urlCat() **/
530
531# /**
532# */
533sub dirIsEmpty
534{
535 my $dir = shift(@_);
536 my @files;
537 if (-e $dir)
538 {
539 opendir(DIR, $dir) or die $!;
540 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
541 closedir(DIR);
542 }
543 @files ? 0 : 1;
544}
545# /** dirIsEmpty() **/
546
547
548## @function recursiveDelete()
549#
550sub recursiveDelete
551{
552 my ($dir, $prefix) = @_;
553 if ($dir =~ /^$prefix/)
554 {
555 &shellCommand('rm -rf "' . $dir . '"');
556 }
557}
558## recursiveDelete() ##
Note: See TracBrowser for help on using the repository browser.