source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27495

Last change on this file since 27495 was 27495, checked in by jmt12, 11 years ago

removing doubled up debug comments and putting some paths in speechmarks to be safer

  • Property svn:executable set to *
File size: 9.9 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $use_thrift = 1;
19my $debug = 0;
20my $dry_run = 0;
21my $gsdl_home = $ENV{'GSDLHOME'};
22my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
23my $hadoop_exe = 'hadoop'; # you may add path
24my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
25my $hdfs_fs_prefix = 'HDThriftFS://';
26my $refresh_import = 0;
27my $username = `whoami`;
28chomp($username);
29
30`rocks > /dev/null 2>&1`;
31my $is_rocks_cluster = ($? == 0);
32
33# 1. Read and validate parameters
34if (defined $ARGV[0])
35{
36 $collection = $ARGV[0];
37}
38else
39{
40 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-disable_thrift] [-refresh_import] \n\n";
41}
42my $offset = 1;
43while (defined $ARGV[$offset])
44{
45 if ($ARGV[$offset] eq '-debug')
46 {
47 $debug = 1;
48 }
49 if ($ARGV[$offset] eq '-disable_thrift')
50 {
51 $use_thrift = 0;
52 }
53 if ($ARGV[$offset] eq '-dry_run')
54 {
55 $dry_run = 1;
56 }
57 if ($ARGV[$offset] eq '-refresh_import')
58 {
59 $refresh_import = 1;
60 }
61 $offset++;
62}
63
64if (!$use_thrift)
65{
66 $hdfs_fs_prefix = 'HDFSShell://';
67}
68
69my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
70my $gs_import_dir = $gs_collection_dir . '/import';
71if (!-d $gs_import_dir)
72{
73 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
74}
75my $gs_results_dir = $gs_collection_dir . '/results';
76if (!-d $gs_results_dir)
77{
78 mkdir($gs_results_dir, 0755);
79}
80$gs_results_dir .= '/' . time();
81if (!-d $gs_results_dir)
82{
83 mkdir($gs_results_dir, 0755);
84}
85my $gs_archives_dir = $gs_collection_dir . '/archives';
86# - directories within HDFS
87#my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
88my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
89#my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
90my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
91
92# 2. Copy the import directory into HDFS
93print " * Replicating import directory in HDFS...";
94# - check if import directory already exists
95my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
96if ($refresh_import || !$hdfs_import_exists)
97{
98 # - clear out the old import directory
99 if ($hdfs_import_exists)
100 {
101 &hdfsCommand('rmr', $hdfs_input_dir);
102 }
103 # - now recursively copy the contents of import directory into HDFS ensuring
104 # that relative paths are maintained
105 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
106 &debugPrint($file_count . " files 'putted'");
107 print "Done!\n";
108}
109else
110{
111 print "Already exists!\n";
112}
113# - clear out the archives regardless
114print " * Clearing existing archives directory for this collection... ";
115if (-e $gs_archives_dir)
116{
117 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
118}
119mkdir($gs_archives_dir, 0755);
120if (&hdfsTest('d', 0, $hdfs_output_dir))
121{
122 &hdfsCommand('rmr', $hdfs_output_dir);
123}
124# - clear out any old logs
125if (!&dirIsEmpty('/tmp/greenstone'))
126{
127 &shellCommand('rm /tmp/greenstone/*.*');
128 &shellCommand('rm /tmp/gsimport*');
129}
130if ($is_rocks_cluster)
131{
132 &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
133 &shellCommand('rocks run host "rm /tmp/gsimport*"');
134}
135print "Done!\n";
136
137# - flush DNS cache too, so we are playing on a level field
138&shellCommand('flush_caches.pl');
139if ($is_rocks_cluster)
140{
141 &shellCommand('rocks run host "flush_caches.pl"');
142}
143
144# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
145# where we start the server now to ensure it lives on the head node
146my $server_host = '';
147my $server_port = '';
148my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
149my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
150my $server_prefix = '';
151if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
152{
153 $server_prefix = uc($1);
154 print " * Starting " . $server_prefix . "Server... ";
155 # - start the server on the head node and retrieve the host and port from
156 # the output
157 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
158 my $launcher_output = &shellCommand($launcher_command);
159 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
160 {
161 $server_host = $1;
162 $server_port = $2;
163 print "running on " . $server_host . ":" . $server_port . "\n";
164 }
165 else
166 {
167 print "Failed!\n";
168 exit;
169 }
170 # - use the client tool to add ourselves as a listener
171 print " * Registering as listener... ";
172 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
173 &shellCommand($client_command);
174 print "Done!\n";
175}
176else
177{
178 print "Error! True Hadoop processing is only available when Greenstone is\n";
179 print " configured to use either GDBMServer or TDBServer.\n";
180 exit;
181}
182
183# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
184# and allow the FileInputFormat to split it up into files to be processed
185# in Greenstone. This works for collections with one file per document, like
186# Lorem and ReplayMe, but might not work well with multiple file documents
187# such as the Demo collection
188print " * Running import using Hadoop...";
189my $hadoop_log = $gs_results_dir . '/hadoop.log';
190my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' . $hdfs_input_dir . '" "' . $hdfs_output_dir . '" > ' . $hadoop_log . ' 2>&1';
191&shellCommand($hadoop_command);
192print "Done!\n";
193
194# 5. If we ran *Server infodbs, we now need to shut them down
195if ($server_prefix ne '')
196{
197 print " * Deregistering as listener and shutting down... ";
198 # - deregister as a listener
199 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
200 &shellCommand($client_command1);
201 # - send quit command
202 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
203 &shellCommand($client_command2);
204 print "Done!\n";
205}
206
207
208# 6. Gather logs
209print " * Gathering logs from compute nodes... ";
210# - local files
211if (!&dirIsEmpty('/tmp/greenstone'))
212{
213 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
214}
215if (-d $gs_collection_dir . '/logs')
216{
217 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
218}
219# - remote files
220if ($is_rocks_cluster)
221{
222 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
223&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
224}
225print "Done!\n";
226# - generate data locality report
227&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
228
229# 7. Done - clean up
230print " * Cleaning up temporary files... ";
231&shellCommand('rm -rf /tmp/greenstone');
232&shellCommand('rm -rf /tmp/gsimport*');
233if ($is_rocks_cluster)
234{
235 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
236 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
237}
238print "Done!\n";
239print "Complete!\n\n";
240
241exit;
242
243# /** @function debugPrint
244# */
245sub debugPrint
246{
247 my $msg = shift(@_);
248 if ($debug)
249 {
250 print "[Debug] " . $msg . "\n";
251 }
252}
253# /** debugPrint() **/
254
255# /** @function hdfsCommand
256# */
257sub hdfsCommand
258{
259 my $command = shift(@_);
260 my $paths = '"' . join('" "', @_) . '"';
261 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
262 &shellCommand($hdfs_command);
263 return $?;
264}
265# /** hdfsCommand() **/
266
267# /** @function hdfsTest
268# */
269sub hdfsTest
270{
271 my $command = shift(@_);
272 my $test_target = shift(@_);
273 my $result = &hdfsCommand('test -' . $command, @_);
274 return ($result == $test_target);
275}
276# /** hdfsTest() **/
277
278# /**
279# */
280sub printUsage
281{
282 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
283 exit;
284}
285# /** printUsage() **/
286
287# /**
288# */
289sub recursiveCopy
290{
291 my ($src_dir, $hdfs_dir) = @_;
292 my $file_count = 0;
293 # - create the directory in HDFS
294 &hdfsCommand('mkdir', $hdfs_dir);
295 # - search $src_dir for files
296 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
297 my @files = readdir(DH);
298 closedir(DH);
299 foreach my $file (@files)
300 {
301 # - skip dot prefix files
302 if ($file !~ /^\./)
303 {
304 my $src_path = $src_dir . '/' . $file;
305 # - recurse directories, remembering to extend HDFS dir too
306 if (-d $src_path)
307 {
308 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
309 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
310 }
311 # - and use 'put' to copy files
312 else
313 {
314 my $hdfs_path = $hdfs_dir . '/' . $file;
315 &hdfsCommand('put', $src_path, $hdfs_path);
316 $file_count++;
317 }
318 }
319 }
320 return $file_count;
321}
322# /** recursiveCopy() **/
323
324# /** @function shellCommand
325# */
326sub shellCommand
327{
328 my $cmd = shift(@_);
329 my $output = '';
330 &debugPrint($cmd);
331 if (!$dry_run)
332 {
333 $output = `$cmd`;
334 }
335 return $output;
336}
337# /** shellCommand() **/
338
339# /** @function urlCat
340# */
341sub urlCat
342{
343 my $url = join('/', @_);
344 return $url;
345}
346# /** urlCat() **/
347
348# /**
349# */
350sub dirIsEmpty
351{
352 my $dir = shift(@_);
353 my @files;
354 if (-e $dir)
355 {
356 opendir(DIR, $dir) or die $!;
357 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
358 closedir(DIR);
359 }
360 @files ? 0 : 1;
361}
362# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.