source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27414

Last change on this file since 27414 was 27414, checked in by jmt12, 11 years ago

Allowing more processing arguments to be configured at the call, and passing information such as the desired HDFS driver through to Hadoop processing

  • Property svn:executable set to *
File size: 9.7 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $use_thrift = 1;
19my $debug = 0;
20my $dry_run = 0;
21my $gsdl_home = $ENV{'GSDLHOME'};
22my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
23my $hadoop_exe = 'hadoop'; # you may add path
24my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
25my $hdfs_fs_prefix = 'HDThriftFS://';
26my $refresh_import = 0;
27my $username = `whoami`;
28chomp($username);
29
30`rocks > /dev/null 2>&1`;
31my $is_rocks_cluster = ($? == 0);
32
33# 1. Read and validate parameters
34if (defined $ARGV[0])
35{
36 $collection = $ARGV[0];
37}
38else
39{
40 print STDERR "usage: hadoop_import.pl <collection> [-debug] [-dry_run] [-disable_thrift] [-refresh_import] \n\n";
41}
42my $offset = 1;
43while (defined $ARGV[$offset])
44{
45 if ($ARGV[$offset] eq '-debug')
46 {
47 $debug = 1;
48 }
49 if ($ARGV[$offset] eq '-disable_thrift')
50 {
51 $use_thrift = 0;
52 }
53 if ($ARGV[$offset] eq '-dry_run')
54 {
55 $dry_run = 1;
56 }
57 if ($ARGV[$offset] eq '-refresh_import')
58 {
59 $refresh_import = 1;
60 }
61 $offset++;
62}
63
64if (!$use_thrift)
65{
66 $hdfs_fs_prefix = 'HDFSShell://';
67}
68
69my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
70my $gs_import_dir = $gs_collection_dir . '/import';
71if (!-d $gs_import_dir)
72{
73 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
74}
75my $gs_results_dir = $gs_collection_dir . '/results';
76if (!-d $gs_results_dir)
77{
78 mkdir($gs_results_dir, 0755);
79}
80$gs_results_dir .= '/' . time();
81if (!-d $gs_results_dir)
82{
83 mkdir($gs_results_dir, 0755);
84}
85my $gs_archives_dir = $gs_collection_dir . '/archives';
86# - directories within HDFS
87my $hdfs_input_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'import');
88my $hdfs_output_dir = &urlCat('hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'}, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
89
90# 2. Copy the import directory into HDFS
91print " * Replicating import directory in HDFS...";
92# - check if import directory already exists
93my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
94if ($refresh_import || !$hdfs_import_exists)
95{
96 # - clear out the old import directory
97 if ($hdfs_import_exists)
98 {
99 &hdfsCommand('rmr', $hdfs_input_dir);
100 }
101 # - now recursively copy the contents of import directory into HDFS ensuring
102 # that relative paths are maintained
103 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
104 &debugPrint($file_count . " files 'putted'");
105 print "Done!\n";
106}
107else
108{
109 print "Already exists!\n";
110}
111# - clear out the archives regardless
112print " * Clearing existing archives directory for this collection... ";
113if (-e $gs_archives_dir)
114{
115 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
116}
117mkdir($gs_archives_dir, 0755);
118if (&hdfsTest('d', 0, $hdfs_output_dir))
119{
120 &hdfsCommand('rmr', $hdfs_output_dir);
121}
122# - clear out any old logs
123if (!&dirIsEmpty('/tmp/greenstone'))
124{
125 &shellCommand('rm /tmp/greenstone/*.*');
126 &shellCommand('rm /tmp/gsimport*');
127}
128if ($is_rocks_cluster)
129{
130 &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
131 &shellCommand('rocks run host "rm /tmp/gsimport*"');
132}
133print "Done!\n";
134
135# - flush DNS cache too, so we are playing on a level field
136&shellCommand('flush_caches.pl');
137if ($is_rocks_cluster)
138{
139 &shellCommand('rocks run host "flush_caches.pl"');
140}
141
142# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
143# where we start the server now to ensure it lives on the head node
144my $server_host = '';
145my $server_port = '';
146my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
147my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
148my $server_prefix = '';
149if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
150{
151 $server_prefix = uc($1);
152 print " * Starting " . $server_prefix . "Server... ";
153 # - start the server on the head node and retrieve the host and port from
154 # the output
155 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
156 my $launcher_output = &shellCommand($launcher_command);
157 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
158 {
159 $server_host = $1;
160 $server_port = $2;
161 print "running on " . $server_host . ":" . $server_port . "\n";
162 }
163 else
164 {
165 print "Failed!\n";
166 exit;
167 }
168 # - use the client tool to add ourselves as a listener
169 print " * Registering as listener... ";
170 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
171 &shellCommand($client_command);
172 print "Done!\n";
173}
174else
175{
176 print "Error! True Hadoop processing is only available when Greenstone is\n";
177 print " configured to use either GDBMServer or TDBServer.\n";
178 exit;
179}
180
181# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
182# and allow the FileInputFormat to split it up into files to be processed
183# in Greenstone. This works for collections with one file per document, like
184# Lorem and ReplayMe, but might not work well with multiple file documents
185# such as the Demo collection
186print " * Running import using Hadoop...";
187my $hadoop_log = $gs_results_dir . '/hadoop.log';
188my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $hdfs_fs_prefix . '" "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . " " . $hdfs_input_dir . " " . $hdfs_output_dir . " > " . $hadoop_log . " 2>&1";
189&shellCommand($hadoop_command);
190print "Done!\n";
191
192# 5. If we ran *Server infodbs, we now need to shut them down
193if ($server_prefix ne '')
194{
195 print " * Deregistering as listener and shutting down... ";
196 # - deregister as a listener
197 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
198 &shellCommand($client_command1);
199 # - send quit command
200 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
201 &shellCommand($client_command2);
202 print "Done!\n";
203}
204
205
206# 6. Gather logs
207print " * Gathering logs from compute nodes... ";
208# - local files
209if (!&dirIsEmpty('/tmp/greenstone'))
210{
211 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
212}
213if (-d $gs_collection_dir . '/logs')
214{
215 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
216}
217# - remote files
218if ($is_rocks_cluster)
219{
220 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
221&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
222}
223print "Done!\n";
224# - generate data locality report
225&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
226
227# 7. Done - clean up
228print " * Cleaning up temporary files... ";
229&shellCommand('rm -rf /tmp/greenstone');
230&shellCommand('rm -rf /tmp/gsimport*');
231if ($is_rocks_cluster)
232{
233 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
234 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
235}
236print "Done!\n";
237print "Complete!\n\n";
238
239exit;
240
241# /** @function debugPrint
242# */
243sub debugPrint
244{
245 my $msg = shift(@_);
246 if ($debug)
247 {
248 print "[Debug] " . $msg . "\n";
249 }
250}
251# /** debugPrint() **/
252
253# /** @function hdfsCommand
254# */
255sub hdfsCommand
256{
257 my $command = shift(@_);
258 my $paths = '"' . join('" "', @_) . '"';
259 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
260 if ($debug)
261 {
262 print STDERR "[DEBUG] $hdfs_command\n";
263 }
264 &shellCommand($hdfs_command);
265 return $?;
266}
267# /** hdfsCommand() **/
268
269# /** @function hdfsTest
270# */
271sub hdfsTest
272{
273 my $command = shift(@_);
274 my $test_target = shift(@_);
275 my $result = &hdfsCommand('test -' . $command, @_);
276 return ($result == $test_target);
277}
278# /** hdfsTest() **/
279
280# /**
281# */
282sub printUsage
283{
284 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
285 exit;
286}
287# /** printUsage() **/
288
289# /**
290# */
291sub recursiveCopy
292{
293 my ($src_dir, $hdfs_dir) = @_;
294 my $file_count = 0;
295 # - create the directory in HDFS
296 &hdfsCommand('mkdir', $hdfs_dir);
297 # - search $src_dir for files
298 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
299 my @files = readdir(DH);
300 closedir(DH);
301 foreach my $file (@files)
302 {
303 # - skip dot prefix files
304 if ($file !~ /^\./)
305 {
306 my $src_path = $src_dir . '/' . $file;
307 # - recurse directories, remembering to extend HDFS dir too
308 if (-d $src_path)
309 {
310 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
311 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
312 }
313 # - and use 'put' to copy files
314 else
315 {
316 my $hdfs_path = $hdfs_dir . '/' . $file;
317 &hdfsCommand('put', $src_path, $hdfs_path);
318 $file_count++;
319 }
320 }
321 }
322 return $file_count;
323}
324# /** recursiveCopy() **/
325
326# /** @function shellCommand
327# */
328sub shellCommand
329{
330 my $cmd = shift(@_);
331 my $output = '';
332 &debugPrint($cmd);
333 if (!$dry_run)
334 {
335 $output = `$cmd`;
336 }
337 return $output;
338}
339# /** shellCommand() **/
340
341# /** @function urlCat
342# */
343sub urlCat
344{
345 my $url = join('/', @_);
346 return $url;
347}
348# /** urlCat() **/
349
350# /**
351# */
352sub dirIsEmpty
353{
354 my $dir = shift(@_);
355 my @files;
356 if (-e $dir)
357 {
358 opendir(DIR, $dir) or die $!;
359 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
360 closedir(DIR);
361 }
362 @files ? 0 : 1;
363}
364# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.