source: gs2-extensions/parallel-building/trunk/src/bin/script/hadoop_import.pl@ 27126

Last change on this file since 27126 was 27126, checked in by jmt12, 11 years ago

Extra clean up commands (like removing cached versions of video processing) and a call to flush_caches.pl to try and clear disk and DNS caches

  • Property svn:executable set to *
File size: 9.1 KB
Line 
1#!/usr/bin/perl
2use strict;
3use warnings;
4
5# Requires setup.bash to have been sourced
6BEGIN
7{
8 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
9 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
10 die "HDFS HOST not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSHOST'};
11 die "HDFS PORT not set (set in <gsdl>/ext/parallel_processing/setup.bash)\n" unless defined $ENV{'HDFSPORT'};
12}
13
14print "===== Greenstone Import using Hadoop =====\n";
15
16# 0. Init
17my $collection = 'test';
18my $debug = 0;
19my $dry_run = 0;
20my $gsdl_home = $ENV{'GSDLHOME'};
21my $gsdl_hadoop_ext = $ENV{'GEXTPARALLELBUILDING_INSTALLED'};
22my $hadoop_exe = 'hadoop'; # you may add path
23my $cluster_head = $ENV{'HDFSHOST'}; # may not be true on advanced configs
24my $hdfs_fs_prefix = 'hdfs://' . $ENV{'HDFSHOST'} . ':' . $ENV{'HDFSPORT'};
25my $refresh_import = 0;
26my $username = `whoami`;
27chomp($username);
28
29`rocks > /dev/null 2>&1`;
30my $is_rocks_cluster = ($? == 0);
31
32# 1. Read and validate parameters
33if (defined $ARGV[0])
34{
35 $collection = $ARGV[0];
36}
37my $gs_collection_dir = $gsdl_home . '/collect/' . $collection;
38my $gs_import_dir = $gs_collection_dir . '/import';
39if (!-d $gs_import_dir)
40{
41 die("Error! Collection's import directory cannot be found: " . $gs_import_dir . "\n");
42}
43my $gs_results_dir = $gs_collection_dir . '/results';
44if (!-d $gs_results_dir)
45{
46 mkdir($gs_results_dir, 0755);
47}
48$gs_results_dir .= '/' . time();
49if (!-d $gs_results_dir)
50{
51 mkdir($gs_results_dir, 0755);
52}
53my $gs_archives_dir = $gs_collection_dir . '/archives';
54# - directories within HDFS
55my $hdfs_input_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'import');
56my $hdfs_output_dir = &urlCat($hdfs_fs_prefix, 'user', $username, 'gsdl', 'collect', $collection, 'archives');
57
58# 2. Copy the import directory into HDFS
59print " * Replicating import directory in HDFS...";
60# - check if import directory already exists
61my $hdfs_import_exists = &hdfsTest('d', 0, $hdfs_input_dir);
62if ($refresh_import || !$hdfs_import_exists)
63{
64 # - clear out the old import directory
65 if ($hdfs_import_exists)
66 {
67 &hdfsCommand('rmr', $hdfs_input_dir);
68 }
69 # - now recursively copy the contents of import directory into HDFS ensuring
70 # that relative paths are maintained
71 my $file_count = &recursiveCopy($gs_import_dir, $hdfs_input_dir);
72 &debugPrint($file_count . " files 'putted'");
73 print "Done!\n";
74}
75else
76{
77 print "Already exists!\n";
78}
79# - clear out the archives regardless
80print " * Clearing existing archives directory for this collection... ";
81if (-e $gs_archives_dir)
82{
83 &shellCommand('rm -rf "' . $gs_archives_dir . '"');
84}
85mkdir($gs_archives_dir, 0755);
86if (&hdfsTest('d', 0, $hdfs_output_dir))
87{
88 &hdfsCommand('rmr', $hdfs_output_dir);
89}
90# - clear out any old logs
91if (!&dirIsEmpty('/tmp/greenstone'))
92{
93 &shellCommand('rm /tmp/greenstone/*.*');
94 &shellCommand('rm /tmp/gsimport*');
95}
96if ($is_rocks_cluster)
97{
98 &shellCommand('rocks run host "rm /tmp/greenstone/*.*"');
99 &shellCommand('rocks run host "rm /tmp/gsimport*"');
100}
101print "Done!\n";
102
103# - flush DNS cache too, so we are playing on a level field
104&shellCommand('flush_caches.pl');
105&shellCommand('rocks run host "flush_caches.pl"');
106
107# 3. Special case for *Server type infodbs (namely TDBServer and GDBMServer)
108# where we start the server now to ensure it lives on the head node
109my $server_host = '';
110my $server_port = '';
111my $configuration_path = $gs_collection_dir . '/etc/collect.cfg';
112my $infodbtype = `grep -P "^infodbtype" $configuration_path`;
113my $server_prefix = '';
114if ($infodbtype =~ /^infodbtype\s+(gdbm|tdb)server/i)
115{
116 $server_prefix = uc($1);
117 print " * Starting " . $server_prefix . "Server... ";
118 # - start the server on the head node and retrieve the host and port from
119 # the output
120 my $launcher_command = $server_prefix . "Server.pl " . $$ . " " . $collection;
121 my $launcher_output = &shellCommand($launcher_command);
122 if ($launcher_output =~ /Server now listening on ([^:]+):(\d+)/)
123 {
124 $server_host = $1;
125 $server_port = $2;
126 print "running on " . $server_host . ":" . $server_port . "\n";
127 }
128 else
129 {
130 print "Failed!\n";
131 exit;
132 }
133 # - use the client tool to add ourselves as a listener
134 print " * Registering as listener... ";
135 my $client_command = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#a:" . $$ . "\"";
136 &shellCommand($client_command);
137 print "Done!\n";
138}
139else
140{
141 print "Error! True Hadoop processing is only available when Greenstone is\n";
142 print " configured to use either GDBMServer or TDBServer.\n";
143 exit;
144}
145
146# 4. Running Hadoop - we hand in the import dirctory (within HDFS) as the input
147# and allow the FileInputFormat to split it up into files to be processed
148# in Greenstone. This works for collections with one file per document, like
149# Lorem and ReplayMe, but might not work well with multiple file documents
150# such as the Demo collection
151print " * Running import using Hadoop...";
152my $hadoop_log = $gs_results_dir . '/hadoop.log';
153my $hadoop_command = $hadoop_exe . ' jar ' . $gsdl_hadoop_ext . '/lib/hadoop-greenstone.jar org.nzdl.gsdl.HadoopGreenstoneIngest "' . $gsdl_home . '" "' . $ENV{'HDFSHOST'} . '" ' . $ENV{'HDFSPORT'} . ' "' . $ENV{'HADOOP_PREFIX'} . '" ' . $collection . " " . $hdfs_input_dir . " " . $hdfs_output_dir . " > " . $hadoop_log . " 2>&1";
154&shellCommand($hadoop_command);
155print "Done!\n";
156
157# 5. If we ran *Server infodbs, we now need to shut them down
158if ($server_prefix ne '')
159{
160 print " * Deregistering as listener and shutting down... ";
161 # - deregister as a listener
162 my $client_command1 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#r:" . $$ . "\"";
163 &shellCommand($client_command1);
164 # - send quit command
165 my $client_command2 = $server_prefix . "Client.pl " . $server_host . " " . $server_port . " \"#q:" . $$ . "\"";
166 &shellCommand($client_command2);
167 print "Done!\n";
168}
169
170
171# 6. Gather logs
172print " * Gathering logs from compute nodes... ";
173# - local files
174if (!&dirIsEmpty('/tmp/greenstone'))
175{
176 &shellCommand('cp /tmp/greenstone/*.* ' . $gs_results_dir);
177}
178if (-d $gs_collection_dir . '/logs')
179{
180 &shellCommand('cp ' . $gs_collection_dir . '/logs/*.* ' . $gs_results_dir);
181}
182# - remote files
183if ($is_rocks_cluster)
184{
185 &shellCommand('rocks run host "scp /tmp/greenstone/*.* ' . $cluster_head . ':' . $gs_results_dir . '"');
186&shellCommand('rocks run host "scp /tmp/gsimport-*/logs/*.log ' . $cluster_head . ':' . $gs_results_dir . '"');
187}
188print "Done!\n";
189# - generate data locality report
190&shellCommand('parse_task_info_from_hadoop_log.pl "' . $gs_results_dir . '"');
191
192# 7. Done - clean up
193print " * Cleaning up temporary files... ";
194&shellCommand('rm -rf /tmp/greenstone');
195&shellCommand('rm -rf /tmp/gsimport*');
196if ($is_rocks_cluster)
197{
198 &shellCommand('rocks run host "rm -rf /tmp/greenstone"');
199 &shellCommand('rocks run host "rm -rf /tmp/gsimport*"');
200}
201print "Done!\n";
202print "Complete!\n\n";
203
204exit;
205
206# /** @function debugPrint
207# */
208sub debugPrint
209{
210 my $msg = shift(@_);
211 if ($debug)
212 {
213 print "[Debug] " . $msg . "\n";
214 }
215}
216# /** debugPrint() **/
217
218# /** @function hdfsCommand
219# */
220sub hdfsCommand
221{
222 my $command = shift(@_);
223 my $paths = '"' . join('" "', @_) . '"';
224 my $hdfs_command = $hadoop_exe . ' fs -' . $command . ' ' . $paths . ' 2>&1';
225 &shellCommand($hdfs_command);
226 return $?;
227}
228# /** hdfsCommand() **/
229
230# /** @function hdfsTest
231# */
232sub hdfsTest
233{
234 my $command = shift(@_);
235 my $test_target = shift(@_);
236 my $result = &hdfsCommand('test -' . $command, @_);
237 return ($result == $test_target);
238}
239# /** hdfsTest() **/
240
241# /**
242# */
243sub printUsage
244{
245 print "usage: hadoop_import.pl <collection> [<refresh_import>] [<\"removeold\"|\"keepold\">]\n";
246 exit;
247}
248# /** printUsage() **/
249
250# /**
251# */
252sub recursiveCopy
253{
254 my ($src_dir, $hdfs_dir) = @_;
255 my $file_count = 0;
256 # - create the directory in HDFS
257 &hdfsCommand('mkdir', $hdfs_dir);
258 # - search $src_dir for files
259 opendir(DH, $src_dir) or die("Error! Cannot open directory for reading: " . $src_dir);
260 my @files = readdir(DH);
261 closedir(DH);
262 foreach my $file (@files)
263 {
264 # - skip dot prefix files
265 if ($file !~ /^\./)
266 {
267 my $src_path = $src_dir . '/' . $file;
268 # - recurse directories, remembering to extend HDFS dir too
269 if (-d $src_path)
270 {
271 my $new_hdfs_dir = $hdfs_dir . '/' . $file;
272 $file_count += &recursiveCopy($src_path, $new_hdfs_dir);
273 }
274 # - and use 'put' to copy files
275 else
276 {
277 my $hdfs_path = $hdfs_dir . '/' . $file;
278 &hdfsCommand('put', $src_path, $hdfs_path);
279 $file_count++;
280 }
281 }
282 }
283 return $file_count;
284}
285# /** recursiveCopy() **/
286
287# /** @function shellCommand
288# */
289sub shellCommand
290{
291 my $cmd = shift(@_);
292 my $output = '';
293 &debugPrint($cmd);
294 if (!$dry_run)
295 {
296 $output = `$cmd`;
297 }
298 return $output;
299}
300# /** shellCommand() **/
301
302# /** @function urlCat
303# */
304sub urlCat
305{
306 my $url = join('/', @_);
307 return $url;
308}
309# /** urlCat() **/
310
311# /**
312# */
313sub dirIsEmpty
314{
315 my $dir = shift(@_);
316 my @files;
317 if (-e $dir)
318 {
319 opendir(DIR, $dir) or die $!;
320 @files = grep { !m/\A\.{1,2}\Z/} readdir(DIR);
321 closedir(DIR);
322 }
323 @files ? 0 : 1;
324}
325# /** dirIsEmpty() **/
Note: See TracBrowser for help on using the repository browser.