source: gs2-extensions/tdb-edit/trunk/src/bin/script/TDBServer.pl@ 25402

Last change on this file since 25402 was 25402, checked in by jmt12, 12 years ago

A daemonized, socket-pooled, multithreaded server allowing remote editing of TDB databases. This is for use on clusters or other circumstances where byte-level locking isn't supported (and so multiple writer access to TDB causing exciting issues)

  • Property svn:executable set to *
File size: 13.4 KB
Line 
1#!/usr/bin/perl
2
3use strict;
4use warnings;
5
6# Setup Environment
7BEGIN
8{
9 die "GSDLHOME not set\n" unless defined $ENV{'GSDLHOME'};
10 die "GSDLOS not set\n" unless defined $ENV{'GSDLOS'};
11
12 unshift (@INC, "$ENV{'GSDLHOME'}/perllib");
13 unshift (@INC, "$ENV{'GSDLHOME'}/perllib/cpan");
14 unshift (@INC, "$ENV{'GSDLHOME'}/perllib/cpan/XML/XPath");
15 unshift (@INC, "$ENV{'GSDLHOME'}/perllib/plugins");
16 unshift (@INC, "$ENV{'GSDLHOME'}/perllib/classify");
17
18 if (defined $ENV{'GSDLEXTS'})
19 {
20 my @extensions = split(/:/,$ENV{'GSDLEXTS'});
21 foreach my $e (@extensions)
22 {
23 my $ext_prefix = "$ENV{'GSDLHOME'}/ext/$e";
24 unshift (@INC, "$ext_prefix/perllib");
25 unshift (@INC, "$ext_prefix/perllib/cpan");
26 unshift (@INC, "$ext_prefix/perllib/plugins");
27 unshift (@INC, "$ext_prefix/perllib/classify");
28 }
29 }
30 if (defined $ENV{'GSDL3EXTS'})
31 {
32 my @extensions = split(/:/,$ENV{'GSDL3EXTS'});
33 foreach my $e (@extensions)
34 {
35 my $ext_prefix = "$ENV{'GSDL3SRCHOME'}/ext/$e";
36 unshift (@INC, "$ext_prefix/perllib");
37 unshift (@INC, "$ext_prefix/perllib/cpan");
38 unshift (@INC, "$ext_prefix/perllib/plugins");
39 unshift (@INC, "$ext_prefix/perllib/classify");
40 }
41 }
42
43 # Manually installed CPAN package in GEXT*INSTALL
44 unshift (@INC, $ENV{'GEXTTDBEDIT_INSTALLED'} . "/share/perl/5.12.4");
45}
46
47use Cwd;
48
49# We need to do a little file locking
50use Fcntl qw(:flock); #import LOCK_* constants
51
52# Advanced child process control allowing bidirectional pipes
53use IPC::Run qw(harness start pump finish);
54
55# we need to run as a daemon
56use Proc::Daemon;
57
58# The server will need to accept requests from multiple threads, and
59# so will need threads in and of itself
60use threads;
61use threads::shared;
62
63# Greenstone utility functions (filename_cat)
64use util;
65
66# A simple server that listens on a socket and 'forks' off child threads to
67# handle each incoming request
68use SocketsSwimmingThreadPoolServer;
69
70# Globally available - but once set these are read-only - so locking isn't
71# an issue
72my $tdbexe = 'tdbcli';
73my $parent_pid = 0;
74my $collection = '';
75my $no_daemon = 0;
76my $debug = 0;
77my $server;
78my $server_host;
79my $server_port;
80my $server_threads;
81# - shared and, more importantly, lockable
82my %listeners :shared;
83my $should_stop :shared = 0;
84my $debug_log :shared = 0;
85
86print "===== TDB Server =====\n";
87print "Provides a server to allow multiple remote machines to simultaenously\n";
88print "edit one or more TDB databases on the local machine. This is to work\n";
89print "around NFS file locking issues when parallel processing on a cluster.\n";
90
91MAIN:
92{
93 # Check arguments
94 # - compulsory
95 if (!defined $ARGV[0] || $ARGV[0] !~ /^\d+$/)
96 {
97 &printUsageAndExit('Error! Missing parent process ID or not a PID');
98 }
99 $parent_pid = $ARGV[0];
100 if (!defined $ARGV[1])
101 {
102 &printUsageAndExit('Error! Missing active Greenstone collection name');
103 }
104 $collection = $ARGV[1];
105 # - optional
106 my $i = 2;
107 while (defined $ARGV[$i])
108 {
109 if ($ARGV[$i] eq "-nodaemon")
110 {
111 $no_daemon = 1;
112 }
113 if ($ARGV[$i] eq "-debug")
114 {
115 $debug = 1;
116 }
117 $i++;
118 }
119
120 # Read in the collection specific configuration
121 my $cfg_path = &util::filename_cat($ENV{'GSDLHOME'}, 'collect', $collection, 'tdbserver.conf');
122 open(CFGIN, '<' . $cfg_path) or die("Failed to read config file: " . $cfg_path);
123 my $line = '';
124 while (($line = <CFGIN>))
125 {
126 if ($line =~ /^(\w+)\s+(.*)$/)
127 {
128 my $key = $1;
129 my $value = $2;
130 if ($key eq "serverhost")
131 {
132 $server_host = $value;
133 }
134 if ($key eq "serverport")
135 {
136 $server_port = $value;
137 }
138 if ($key eq "threads")
139 {
140 $server_threads = $value;
141 }
142 }
143 }
144 close(CFGIN);
145
146 if ($debug)
147 {
148 print " - collection: " . $collection . "\n";
149 print " - parent pid: " . $parent_pid . "\n";
150 print " - no daemon? " . $no_daemon . "\n";
151 print " - debug? " . $debug . "\n";
152 print " - serverhost: " . $server_host . "\n";
153 print " - serverport: " . $server_port . "\n";
154 print " - threads: " . $server_threads . "\n";
155 print "\n";
156 }
157
158 # Information about any running TDBServer is stored in a lockfile in
159 # Greenstone's tmp directory (named after the active collection)
160 my $tmp_dir = &util::filename_cat($ENV{'GSDLHOME'}, 'collect', $collection, 'tmp');
161 if (!-d $tmp_dir)
162 {
163 mkdir($tmp_dir, 0755);
164 }
165 my $server_lockfile_path = &util::filename_cat($tmp_dir, 'tdbserver.lock');
166
167 # If already running, then exit
168 print " * Testing if TDBServer for this collection already running... ";
169 if (-e $server_lockfile_path)
170 {
171 print "Error! TDBServer already running!\n";
172 print "Lockfile found at: " . $server_lockfile_path . "\n";
173 exit(0);
174 }
175 print "All clear!\n";
176
177 # Ensure we can see tdb edit tools on the path
178 print " * Testing for tool: " . $tdbexe . "... ";
179 my $result = `$tdbexe 2>&1`;
180 if ($result !~ /usage:\s+$tdbexe/)
181 {
182 print "Error! " . $tdbexe . " not available - check path.\n";
183 exit(0);
184 }
185 print "Found!\n";
186
187 # Daemonize
188 my $pid = 0;
189 if (!$no_daemon)
190 {
191 print " * Spawning Daemon...\n" unless (!$debug);
192 my $logs_dir = &util::filename_cat($ENV{'GSDLHOME'}, 'collect', $collection, 'logs');
193 if (!-d $logs_dir)
194 {
195 mkdir($logs_dir, 0755);
196 }
197 my $daemon_out_path = &util::filename_cat($logs_dir, 'tdbserver.out');
198 my $daemon_err_path = &util::filename_cat($logs_dir, 'tdbserver.err');
199 $pid = Proc::Daemon::Init( { work_dir => getcwd(),
200 child_STDOUT => $daemon_out_path,
201 child_STDERR => $daemon_err_path,
202 } );
203 }
204
205 # Master process has pid > 0
206 if ($pid == 0)
207 {
208 # - create server object
209 print " * Creating pool of " . $server_threads . " threads listening on socket: " . $server_host . ":" . $server_port . "\n";
210 $server = SocketsSwimmingThreadPoolServer->new(host=>$server_host,
211 port=>$server_port,
212 thread_count=>$server_threads,
213# main_cb => \&exitCheck,
214 processor_cb => \&process);
215
216 # - write a lockfile
217 print " * Creating lock file: " . $server_lockfile_path . "\n";
218 open(SLFH, ">", $server_lockfile_path) or die("Error! Failed to open file for writing: " . $server_lockfile_path . "\nReason: " . $! . "\n");
219 flock(SLFH, LOCK_EX) or die("Error! Cannot lock file for writing: " . $server_lockfile_path . "\nReason: " . $! . "\n");
220 print SLFH $server_host . ':' . $server_port;
221 flock(SLFH, LOCK_UN);
222 close(SLFH);
223
224 # Perform main loop
225 # - loop is actually in Server code. start() only returns once server's stop
226 # command has been called
227 print " * Listening:\n";
228 $server->start;
229 print " * Stopping...\n";
230
231 # Perform deinitializes here
232 # - remove server lockfile
233 print " * Removing lock file...\n";
234 unlink($server_lockfile_path);
235 print "Done!\n";
236 }
237 # Forked child processes
238 else
239 {
240 print " * Waiting for lockfile to be created";
241 while (!-e $server_lockfile_path)
242 {
243 print '.';
244 sleep(1);
245 }
246 print "\n * TDBServer lockfile created.\n";
247 open(SLFH, "<", $server_lockfile_path) or die("Error! Failed to open file for reading: " . $server_lockfile_path . "\nReason: " . $! . "\n");
248 flock(SLFH, LOCK_SH) or die("Error! Cannot lock file for reading: " . $server_lockfile_path . "\nReason: " . $! . "\n");
249 my $line = <SLFH>;
250 if ($line =~ /(^.+):(\d+)$/)
251 {
252 print " => Server now listening on " . $1 . ":" . $2 . "\n";
253 }
254 else
255 {
256 die ("Error! Failed to retrieve host and port information from lockfile!");
257 }
258 flock(SLFH, LOCK_UN);
259 close(SLFH);
260 }
261
262 print "===== Complete! =====\n";
263}
264exit(0);
265
266# @function exitCheck
267# A callback function, called every 5 seconds (default) by the socket server,
268# to see whether the parent process (by pid) is actually still running. This
269# will cover the case where the parent process (import.pl or build.pl) dies
270# without properly asking the server to shutdown.
271sub exitCheck
272{
273 my $counter = shift @_;
274 print "[DEBUG] Has parent process gone away? [" . $parent_pid . "]\n";
275 # Parent PID not available or we aren't allowed to talk to it (debugging)
276 if ($parent_pid == 0)
277 {
278 return;
279 }
280 # note: kill, when passed a first argument of 0, checks whether it's possible
281 # to send a signal to the pid given as the second argument, and returns true
282 # if it is. Thus it provides a means to determine if the parent process is
283 # still running (and hence can be signalled) In newer versions of Perl
284 # (5.8.9) it should even work cross-platform.
285 if (!kill(0, $parent_pid))
286 {
287 print " * Parent processs gone away... forcing server shutdown\n";
288 $server->stop;
289 if ($debug)
290 {
291 lock($debug_log);
292 $|++;
293 print "[" . time() . "|MAIN] Parent process gone away... forcing server shutdown.\n\n";
294 $|--;
295 }
296 }
297}
298
299# /** @function process
300# * A horribly named function that is called back to process each of the
301# * requests to alter the TDB databases. It expects a complete TDB CLI
302# * command as a text blob, or one of a limited number of special commands
303# * ([a]dd or [r]emove listener, or [q]uit).
304# */
305sub process
306{
307 my $data = shift @_;
308 my $ip = shift @_;
309 my $tid = shift @_;
310 my $result = "#ERROR#";
311 # Synchronized debug log writing
312 if ($debug)
313 {
314 lock($debug_log);
315 $|++;
316 print "[" . time() . "|" . $tid . "|RECV] " . $data . "\n";
317 $|--;
318 }
319 # process special commands first
320 if ($data =~ /^#([arq]):(.*)$/)
321 {
322 my $command = $1;
323 my $argument = $2;
324 # addlistener(<pid>)
325 if ($command eq "a")
326 {
327 lock(%listeners);
328 $listeners{$argument} = 1;
329 my $listener_count = scalar(keys(%listeners));
330 $result = "[SUCCESS] added listener [" . $listener_count . " listeners]";
331 # //unlock(%listeners)
332 }
333 # removelistener(<pid>)
334 elsif ($command eq "r")
335 {
336 my $listener_count = 0;
337 {
338 lock(%listeners);
339 if (defined $listeners{$argument})
340 {
341 delete $listeners{$argument};
342 }
343 $listener_count = scalar(keys(%listeners));
344 # //unlock(%listeners)
345 }
346 lock($should_stop);
347 if ($should_stop == 1 && $listener_count == 0)
348 {
349 # server isn't shared, but the stop data member is!
350 $server->stop;
351 $result = "[SUCCESS] removed last listener, stopping";
352 }
353 else
354 {
355 $result = "[SUCCESS] removed listener [" . $listener_count . " listeners]";
356 }
357 # //unlock($should_stop)
358 }
359 # we may be asked to stop the server, but only by the process that created
360 # us. If there are no listeners registered, we stop straight away,
361 # otherwise we set a flag so that as soon as there are no listeners we
362 # stop.
363 elsif ($command eq "q")
364 {
365 if ($argument ne $parent_pid && $argument ne "*")
366 {
367 $result = "[IGNORED] can only be stopped by parent process";
368 }
369 else
370 {
371 my $listener_count = 0;
372 {
373 lock(%listeners);
374 $listener_count = scalar(keys(%listeners));
375 # //unlock(%listeners)
376 }
377 if ($listener_count == 0)
378 {
379 # server isn't shared, but the stop data member is!
380 $server->stop;
381 $result = "[SUCCESS] stopping";
382 }
383 else
384 {
385 lock($should_stop);
386 $should_stop = 1;
387 $result = "[PENDING] will stop when no more listeners";
388 # //unlock($should_stop)
389 }
390 }
391 }
392 }
393 # Everything thing else should be a TDB command
394 # form <database>:<key>:<value>
395 # where: database is [d]oc, [i]ndex, or [s]rc
396 elsif ($data =~ /^([dis]):(.+)$/s)
397 {
398 my $database = $1;
399 my $record = $2;
400 # Build path to database file
401 my $tdb_path = '';
402 if ($database eq 'd')
403 {
404 $tdb_path = &util::filename_cat($ENV{'GSDLHOME'}, 'collect', $collection, 'archives', 'archiveinf-doc.tdb');
405 }
406 elsif ($database eq 's')
407 {
408 $tdb_path = &util::filename_cat($ENV{'GSDLHOME'}, 'collect', $collection, 'archives', 'archiveinf-src.tdb');
409 }
410 else
411 {
412 $tdb_path = &util::filename_cat($ENV{'GSDLHOME'}, 'collect', $collection, 'building', 'text', $collection . '.tdb');
413 }
414 # Open harness to TDBCLI
415 my @tdb_command = ($tdbexe, $tdb_path);
416 my $buffer_to_tdb = '';
417 my $buffer_from_tdb = '';
418 my $tdb_harness = start(\@tdb_command, \$buffer_to_tdb, \$buffer_from_tdb);
419 # Check the harness worked
420 if (!pumpable $tdb_harness)
421 {
422 die("Error! Harness to " . $tdbexe . " has gone away!");
423 }
424 # - write the data to the TDBCLI
425 $buffer_to_tdb = $record . "\n";
426 pump($tdb_harness) while (length($buffer_to_tdb));
427 # - read any response from TDBCLI
428 pump($tdb_harness) until ($buffer_from_tdb =~ /-{70}/);
429 # - not that this result doesn't include the [Server] prefix as it
430 # may be parsed for data by the client
431 $result = $buffer_from_tdb;
432 chomp($result);
433 # Finished with harness
434 finish($tdb_harness);
435 }
436 # Synchronized debug log writing
437 if ($debug)
438 {
439 lock($debug_log);
440 $|++;
441 print "[" . time() . "|" . $tid . "|SEND] " . $result . "\n\n";
442 $|--;
443 # //unlock($debug_log);
444 }
445 return $result;
446}
447
448sub printUsageAndExit
449{
450 my ($msg) = @_;
451 print "$msg\n\n";
452 print "Usage: TDBServer.pl <parent_pid> <collectionname> [-nodaemon] [-debug]\n\n";
453 exit(0);
454}
455
4561;
Note: See TracBrowser for help on using the repository browser.