source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27058

Last change on this file since 27058 was 27058, checked in by jmt12, 11 years ago

Adding data locality report generation to Hadoop greenstone imports

  • Property svn:executable set to *
File size: 8.8 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $debug = 0;
19my $dry_run = 0;
20my $gsdl_home = $ENV{'GSDLHOME'};
21my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
22my $hadoop_exe = 'hadoop'; # you may add path
23my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
24my $hdfs_fs_prefix = 'hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'};
25my $refresh_import = 0;
26my $username = `whoami`;
27chomp($username);
28
29`rocks > /dev/null 2>&1`;
30my $is_rocks_cluster = ($? == 0);
31
32# 1. Read and validate parameters
33if (defined $ARGV[0])
34{
35 $collection = $ARGV[0];
36}
37my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
38my $gs_import_dir = $gs_collection_dir . '/import';
39if (!-d $gs_import_dir)
40{
41 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
42}
43my $gs_results_dir = $gs_collection_dir . '/results';
44if (!-d $gs_results_dir)
45{
46 mkdir($gs_results_dir, 0755);
47}
48$gs_results_dir .= '/' . time();
49if (!-d $gs_results_dir)
50{
51 mkdir($gs_results_dir, 0755);
52}
53my $gs_archives_dir = $gs_collection_dir . '/archives';
54# - directories within HDFS
55my $hdfs_input_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'import');
56my $hdfs_output_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
57
58# 2. Copy the import directory into HDFS
59print " * Replicating import directory in HDFS...";
60# - check if import directory already exists
61my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
62if ($refresh_import || !$hdfs_import_exists)
63{
64 # - clear out the old import directory
65 if ($hdfs_import_exists)
66 {
67 &hdfsCommand('rmr', $hdfs_input_dir);
68 }
69 # - now recursively copy the contents of import directory into HDFS ensuring
70 # that relative paths are maintained
71 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
72 &debugPrint($file_count . " files 'putted'");
73 print "Done!\n";
74}
75else
76{
77 print "Already exists!\n";
78}
79# - clear out the archives regardless
80if (-e $gs_archives_dir)
81{
82 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
83}
84mkdir($gs_archives_dir, 0755);
85if (&hdfsTest('d', 0, $hdfs_output_dir))
86{
87 print " * Clearing existing archives directory for this collection... ";
88 &hdfsCommand('rmr', $hdfs_output_dir);
89 print "Done!\n";
90}
91# - clear out any old logs
92if (!&dirIsEmpty('/tmp/greenstone'))
93{
94 &shellCommand('rm /tmp/greenstone/*.*');
95}
96if ($is_rocks_cluster)
97{
98 &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
99}
100
101# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
102# where we start the server now to ensure it lives on the head node
103my $server_host = '';
104my $server_port = '';
105my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
106my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
107my $server_prefix = '';
108if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
109{
110 $server_prefix = uc($1);
111 print " * Starting " . $server_prefix . "Server... ";
112 # - start the server on the head node and retrieve the host and port from
113 # the output
114 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
115 my $launcher_output = &shellCommand($launcher_command);
116 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
117 {
118 $server_host = $1;
119 $server_port = $2;
120 print "running on " . $server_host . ":" . $server_port . "\n";
121 }
122 else
123 {
124 print "Failed!\n";
125 exit;
126 }
127 # - use the client tool to add ourselves as a listener
128 print " * Registering as listener... ";
129 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
130 &shellCommand($client_command);
131 print "Done!\n";
132}
133else
134{
135 print "Error! True Hadoop processing is only available when Greenstone is\n";
136 print " configured to use either GDBMServer or TDBServer.\n";
137 exit;
138}
139
140# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
141# and allow the FileInputFormat to split it up into files to be processed
142# in Greenstone. This works for collections with one file per document, like
143# Lorem and ReplayMe, but might not work well with multiple file documents
144# such as the Demo collection
145print " * Running import using Hadoop...";
146my $hadoop_log = $gs_results_dir . '/hadoop.log';
147my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $ENV{'HDFSHOST'} . '" ' . $ENV{'HDFSPORT'} . ' "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . " " . $hdfs_input_dir . " " . $hdfs_output_dir . " > " . $hadoop_log . " 2>&1";
148&shellCommand($hadoop_command);
149print "Done!\n";
150
151# 5. If we ran *Server infodbs, we now need to shut them down
152if ($server_prefix ne '')
153{
154 print " * Deregistering as listener and shutting down... ";
155 # - deregister as a listener
156 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
157 &shellCommand($client_command1);
158 # - send quit command
159 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
160 &shellCommand($client_command2);
161 print "Done!\n";
162}
163
164
165# 6. Gather logs
166print " * Gathering logs from compute nodes... ";
167# - local files
168if (!&dirIsEmpty('/tmp/greenstone'))
169{
170 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
171}
172if (-d $gs_collection_dir . '/logs')
173{
174 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
175}
176# - remote files
177if ($is_rocks_cluster)
178{
179 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
180&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
181}
182print "Done!\n";
183# - generate data locality report
184&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
185
186# 7. Done - clean up
187print " * Cleaning up temporary files... ";
188&shellCommand('rm -rf /tmp/greenstone');
189if ($is_rocks_cluster)
190{
191 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
192}
193print "Done!\n";
194print "Complete!\n\n";
195
196exit;
197
198# /** @function debugPrint
199# */
200sub debugPrint
201{
202 my $msg = shift(@_);
203 if ($debug)
204 {
205 print "[Debug] " . $msg . "\n";
206 }
207}
208# /** debugPrint() **/
209
210# /** @function hdfsCommand
211# */
212sub hdfsCommand
213{
214 my $command = shift(@_);
215 my $paths = '"' . join('" "', @_) . '"';
216 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
217 &shellCommand($hdfs_command);
218 return $?;
219}
220# /** hdfsCommand() **/
221
222# /** @function hdfsTest
223# */
224sub hdfsTest
225{
226 my $command = shift(@_);
227 my $test_target = shift(@_);
228 my $result = &hdfsCommand('test -' . $command, @_);
229 return ($result == $test_target);
230}
231# /** hdfsTest() **/
232
233# /**
234# */
235sub printUsage
236{
237 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
238 exit;
239}
240# /** printUsage() **/
241
242# /**
243# */
244sub recursiveCopy
245{
246 my ($src_dir, $hdfs_dir) = @_;
247 my $file_count = 0;
248 # - create the directory in HDFS
249 &hdfsCommand('mkdir', $hdfs_dir);
250 # - search $src_dir for files
251 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
252 my @files = readdir(DH);
253 closedir(DH);
254 foreach my $file (@files)
255 {
256 # - skip dot prefix files
257 if ($file !~ /^\./)
258 {
259 my $src_path = $src_dir . '/' . $file;
260 # - recurse directories, remembering to extend HDFS dir too
261 if (-d $src_path)
262 {
263 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
264 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
265 }
266 # - and use 'put' to copy files
267 else
268 {
269 my $hdfs_path = $hdfs_dir . '/' . $file;
270 &hdfsCommand('put', $src_path, $hdfs_path);
271 $file_count++;
272 }
273 }
274 }
275 return $file_count;
276}
277# /** recursiveCopy() **/
278
279# /** @function shellCommand
280# */
281sub shellCommand
282{
283 my $cmd = shift(@_);
284 my $output = '';
285 &debugPrint($cmd);
286 if (!$dry_run)
287 {
288 $output = `$cmd`;
289 }
290 return $output;
291}
292# /** shellCommand() **/
293
294# /** @function urlCat
295# */
296sub urlCat
297{
298 my $url = join('/', @_);
299 return $url;
300}
301# /** urlCat() **/
302
303# /**
304# */
305sub dirIsEmpty
306{
307 my $dir = shift(@_);
308 my @files;
309 if (-e $dir)
310 {
311 opendir(DIR, $dir) or die $!;
312 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
313 closedir(DIR);
314 }
315 @files ? 0 : 1;
316}
317# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.