source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27530

Last change on this file since 27530 was 27530, checked in by jmt12, 11 years ago

Clear out old logs, and adding more comments about what the script is doing (was stalling on disk sync due to me deleting a whole bunch of files, and I wondered what was happening)

  • Property svn:executable set to *
File size: 10.6 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $use_thrift = 1;
19my $debug = 0;
20my $dry_run = 0;
21my $gsdl_home = $ENV{'GSDLHOME'};
22my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
23my $hadoop_exe = 'hadoop'; # you may add path
24my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
25my $hdfs_fs_prefix = 'HDThriftFS://';
26my $refresh_import = 0;
27my $username = `whoami`;
28chomp($username);
29
30`rocks > /dev/null 2>&1`;
31my $is_rocks_cluster = ($? == 0);
32
33# 1. Read and validate parameters
34if (defined $ARGV[0])
35{
36 $collection = $ARGV[0];
37}
38else
39{
40 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-disable_thrift] [-refresh_import] \n\n";
41}
42my $offset = 1;
43while (defined $ARGV[$offset])
44{
45 if ($ARGV[$offset] eq '-debug')
46 {
47 $debug = 1;
48 }
49 if ($ARGV[$offset] eq '-disable_thrift')
50 {
51 $use_thrift = 0;
52 }
53 if ($ARGV[$offset] eq '-dry_run')
54 {
55 $dry_run = 1;
56 }
57 if ($ARGV[$offset] eq '-refresh_import')
58 {
59 $refresh_import = 1;
60 }
61 $offset++;
62}
63
64if (!$use_thrift)
65{
66 $hdfs_fs_prefix = 'HDFSShell://';
67}
68
69my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
70my $gs_import_dir = $gs_collection_dir . '/import';
71if (!-d $gs_import_dir)
72{
73 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
74}
75my $gs_results_dir = $gs_collection_dir . '/results';
76if (!-d $gs_results_dir)
77{
78 mkdir($gs_results_dir, 0755);
79}
80$gs_results_dir .= '/' . time();
81if (!-d $gs_results_dir)
82{
83 mkdir($gs_results_dir, 0755);
84}
85# - directories within HDFS
86#my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
87my $hdfs_input_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'import');
88#my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
89my $hdfs_output_dir = &urlCat('hdfs://', 'user', $username, 'gsdl', 'collect', $collection, 'archives');
90
91# 2. Copy the import directory into HDFS
92print " * Replicating import directory in HDFS...";
93# - check if import directory already exists
94my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
95if ($refresh_import || !$hdfs_import_exists)
96{
97 # - clear out the old import directory
98 if ($hdfs_import_exists)
99 {
100 &hdfsCommand('rmr', $hdfs_input_dir);
101 }
102 # - now recursively copy the contents of import directory into HDFS ensuring
103 # that relative paths are maintained
104 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
105 &debugPrint($file_count . " files 'putted'");
106 print "Done!\n";
107}
108else
109{
110 print "Already exists!\n";
111}
112# - clear out the archives regardless
113my $gs_archives_dir = $gs_collection_dir . '/archives';
114my $deleted_archives = 0;
115if (-e $gs_archives_dir)
116{
117 print " * Clearing existing archives directory for this collection... ";
118 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
119 $deleted_archives = 1;
120}
121mkdir($gs_archives_dir, 0755);
122if (&hdfsTest('d', 0, $hdfs_output_dir))
123{
124 if (!$deleted_archives)
125 {
126 print " * Clearing existing archives directory for this collection... ";
127 }
128 &hdfsCommand('rmr', $hdfs_output_dir);
129 $deleted_archives = 1;
130}
131if ($deleted_archives)
132{
133 print "Done!\n";
134}
135# - watch for cached directories for Media based collections
136my $gs_cached_dir = $gs_collection_dir . '/cached';
137if (-e $gs_cached_dir)
138{
139 print " * Clearing existing cached media directory for this collection... ";
140 &shellCommand('rm -rf "' . $gs_cached_dir . '"');
141 print "Done!\n";
142}
143
144# - clear out any old logs
145print " * Clearing existing logs for this collection... ";
146my $gs_logs_dir = $gs_collection_dir . '/logs';
147if (!&dirIsEmpty($gs_logs_dir))
148{
149 &shellCommand('rm ' . $gs_logs_dir . '/*.*');
150}
151if (!&dirIsEmpty('/tmp/greenstone'))
152{
153 &shellCommand('rm /tmp/greenstone/*.*');
154 &shellCommand('rm /tmp/gsimport*');
155}
156if ($is_rocks_cluster)
157{
158 &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
159 &shellCommand('rocks run host "rm /tmp/gsimport*"');
160}
161print "Done!\n";
162
163# - flush DNS cache too, so we are playing on a level field
164print " * Flushing disk cache... ";
165&shellCommand('flush_caches.pl');
166if ($is_rocks_cluster)
167{
168 &shellCommand('rocks run host "flush_caches.pl"');
169}
170print "Done!\n";
171
172# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
173# where we start the server now to ensure it lives on the head node
174my $server_host = '';
175my $server_port = '';
176my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
177my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
178my $server_prefix = '';
179if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
180{
181 $server_prefix = uc($1);
182 print " * Starting " . $server_prefix . "Server... ";
183 # - start the server on the head node and retrieve the host and port from
184 # the output
185 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
186 my $launcher_output = &shellCommand($launcher_command);
187 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
188 {
189 $server_host = $1;
190 $server_port = $2;
191 print "running on " . $server_host . ":" . $server_port . "\n";
192 }
193 else
194 {
195 print "Failed!\n";
196 exit;
197 }
198 # - use the client tool to add ourselves as a listener
199 print " * Registering as listener... ";
200 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
201 &shellCommand($client_command);
202 print "Done!\n";
203}
204else
205{
206 print "Error! True Hadoop processing is only available when Greenstone is\n";
207 print " configured to use either GDBMServer or TDBServer.\n";
208 exit;
209}
210
211# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
212# and allow the FileInputFormat to split it up into files to be processed
213# in Greenstone. This works for collections with one file per document, like
214# Lorem and ReplayMe, but might not work well with multiple file documents
215# such as the Demo collection
216print " * Running import using Hadoop...";
217my $hadoop_log = $gs_results_dir . '/hadoop.log';
218my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . ' "' . $hdfs_input_dir . '" "' . $hdfs_output_dir . '" > ' . $hadoop_log . ' 2>&1';
219&shellCommand($hadoop_command);
220print "Done!\n";
221
222# 5. If we ran *Server infodbs, we now need to shut them down
223if ($server_prefix ne '')
224{
225 print " * Deregistering as listener and shutting down... ";
226 # - deregister as a listener
227 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
228 &shellCommand($client_command1);
229 # - send quit command
230 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
231 &shellCommand($client_command2);
232 print "Done!\n";
233}
234
235
236# 6. Gather logs
237print " * Gathering logs from compute nodes... ";
238# - local files
239if (!&dirIsEmpty('/tmp/greenstone'))
240{
241 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
242}
243if (-d $gs_collection_dir . '/logs')
244{
245 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
246}
247# - remote files
248if ($is_rocks_cluster)
249{
250 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
251&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
252}
253print "Done!\n";
254# - generate data locality report
255&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
256
257# 7. Done - clean up
258print " * Cleaning up temporary files... ";
259&shellCommand('rm -rf /tmp/greenstone');
260&shellCommand('rm -rf /tmp/gsimport*');
261if ($is_rocks_cluster)
262{
263 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
264 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
265}
266print "Done!\n";
267print "Complete!\n\n";
268
269exit;
270
271# /** @function debugPrint
272# */
273sub debugPrint
274{
275 my $msg = shift(@_);
276 if ($debug)
277 {
278 print "[Debug] " . $msg . "\n";
279 }
280}
281# /** debugPrint() **/
282
283# /** @function hdfsCommand
284# */
285sub hdfsCommand
286{
287 my $command = shift(@_);
288 my $paths = '"' . join('" "', @_) . '"';
289 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
290 &shellCommand($hdfs_command);
291 return $?;
292}
293# /** hdfsCommand() **/
294
295# /** @function hdfsTest
296# */
297sub hdfsTest
298{
299 my $command = shift(@_);
300 my $test_target = shift(@_);
301 my $result = &hdfsCommand('test -' . $command, @_);
302 return ($result == $test_target);
303}
304# /** hdfsTest() **/
305
306# /**
307# */
308sub printUsage
309{
310 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
311 exit;
312}
313# /** printUsage() **/
314
315# /**
316# */
317sub recursiveCopy
318{
319 my ($src_dir, $hdfs_dir) = @_;
320 my $file_count = 0;
321 # - create the directory in HDFS
322 &hdfsCommand('mkdir', $hdfs_dir);
323 # - search $src_dir for files
324 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
325 my @files = readdir(DH);
326 closedir(DH);
327 foreach my $file (@files)
328 {
329 # - skip dot prefix files
330 if ($file !~ /^\./)
331 {
332 my $src_path = $src_dir . '/' . $file;
333 # - recurse directories, remembering to extend HDFS dir too
334 if (-d $src_path)
335 {
336 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
337 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
338 }
339 # - and use 'put' to copy files
340 else
341 {
342 my $hdfs_path = $hdfs_dir . '/' . $file;
343 &hdfsCommand('put', $src_path, $hdfs_path);
344 $file_count++;
345 }
346 }
347 }
348 return $file_count;
349}
350# /** recursiveCopy() **/
351
352# /** @function shellCommand
353# */
354sub shellCommand
355{
356 my $cmd = shift(@_);
357 my $output = '';
358 &debugPrint($cmd);
359 if (!$dry_run)
360 {
361 $output = `$cmd`;
362 }
363 return $output;
364}
365# /** shellCommand() **/
366
367# /** @function urlCat
368# */
369sub urlCat
370{
371 my $url = join('/', @_);
372 return $url;
373}
374# /** urlCat() **/
375
376# /**
377# */
378sub dirIsEmpty
379{
380 my $dir = shift(@_);
381 my @files;
382 if (-e $dir)
383 {
384 opendir(DIR, $dir) or die $!;
385 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
386 closedir(DIR);
387 }
388 @files ? 0 : 1;
389}
390# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.