source: gs2-extensions/tdb-edit/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm@ 25410

Last change on this file since 25410 was 25410, checked in by jmt12, 12 years ago

Adding a new infodbtype, tdbserver, to allow Greenstone to interact with a centralized TDBServer instance on systems that don't support multiple writers over NFS

File size: 5.2 KB
Line 
1# @author Donnie Cameron (macnod)
2# @url https://github.com/macnod/DcServer
3# @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool
4
5package SocketsSwimmingThreadPoolServer;
6
7use lib '.';
8use threads;
9use threads::shared;
10use Thread::Queue;
11use IO::Socket;
12use Time::HiRes qw/sleep/;
13use strict;
14use warnings;
15
16my $stop :shared;
17my $accept_queue = Thread::Queue->new;
18my $closed_queue = Thread::Queue->new;
19
20sub new {
21 # Params: host, port, thread_count, eom_marker, main_yield, main_cb,
22 # done_cb, processor_cb
23 my ($proto, %param)= @_;
24 my $class= ref($proto) || $proto;
25 bless +{
26 socket_defaults => +{
27 LocalHost => $param{host} || 'localhost',
28 LocalPort => $param{port} || 8191},
29 thread_count => $param{thread_count} || 10,
30 main_yield => $param{main_yield} || 5,
31 main_cb => $param{main_cb} || sub {},
32 done_cb => $param{done_cb} || sub {},
33 processor_cb => $param{processor_cb} || \&processor,
34 eom_marker => $param{eom_marker} || "\\n\\.\\n",
35 thread_pool => undef,
36 listen_queue => 5
37 } => $class;
38}
39
40# This callback (for processor_cb) simply explains no other processor function
41# defined.
42sub processor
43{
44 my ($data, $ip, $tid, $fnstop)= @_;
45 return "[tid=$tid; ip=$ip] No function implemented";
46}
47
48sub start
49{
50 my $self= shift;
51
52 # Start a thread to dispatch incoming requests
53 threads->create(sub {$self->accept_requests})->detach;
54
55 # Start the thread pool to handle dispatched requests
56 for (1 .. $self->{thread_count})
57 {
58 threads->create(sub {$self->request_handler})->detach;
59 }
60
61 # Start a loop for performing tasks in the background, while
62 # handling requests
63 $self->main_loop;
64
65 $self->{done_cb}->();
66}
67
68sub stop
69{
70 my $self= shift;
71 $stop= 1;
72}
73
74sub main_loop
75{
76 my $self = shift;
77 my $counter = 1;
78 until($stop)
79 {
80 $self->{main_cb}->($counter++, sub {$self->stop});
81 sleep $self->{main_yield};
82 }
83}
84
85sub accept_requests
86{
87 my $self = shift;
88 my %socket;
89 # lsocket => listerner [sic] socket
90 my $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1);
91 die "Can't create listerner socket. Server can't start. $!." unless $lsocket;
92 until($stop)
93 {
94 # csocket => connected socket (for doing the actual communications)
95 my $csocket = $lsocket->accept;
96 my $n = fileno $csocket;
97 $socket{$n} = $csocket;
98 # add this to the queue that a virtual horde of request_handlers are
99 # waiting upon (much like vultures)
100 $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
101 # we then go ahead and destroy any sockets that we are no longer using.
102 # Note the use on a non-blocking dequeue (so it returns straight away) and
103 # that $n may be undefined (if the queue is empty) skipping the while loop
104 # altogether
105 while($n = $closed_queue->dequeue_nb)
106 {
107 # note: 2 = finished both reading and writing to this socket
108 $socket{$n}->shutdown(2);
109 delete $socket{$n}
110 }
111 }
112 # note: 2 = finished both reading and writing to this socket
113 $lsocket->shutdown(2);
114}
115
116# @function request_handler()
117#
118# There are *thread_count* request_handler threads all sitting, waiting, to
119# process any incoming reqeuests received via the socket. Note that the whole
120# 'race for data' issue is being handled by the Thread::Queue->dequeue command
121# which is, according to the documentation, thread safe. I presume the actual
122# dequeuing is somehow synchronized.
123sub request_handler
124{
125 my $self= shift;
126 my ($n, $ip, $data);
127 my ($receive_time, $process_time, $send_time);
128 until($stop)
129 {
130 # note: dequeue is blocking so this thread will wait here until something
131 # is enqueued
132 ($n, $ip)= split / /, $accept_queue->dequeue();
133 next unless $n;
134 # open socket for reading and writing... reading first of course
135 open my $socket, '+<&=' . $n or die $!;
136 # this will read in the payload from the socket until the end of message
137 # sentinel value is detected. Note that this is done in a loop, so the
138 # client side of the socket may send multiple requests. Eventually the
139 # client will close their end of the socket causing the data to be
140 # undefined (maybe - I don't quite understand the defining construct - see
141 # below) and breaking out of the loop.
142 if(defined($data = $self->receive_client_request($socket)))
143 {
144 # we then pass the payload to the registered processor function (provided
145 # by the caller) to do the actual legwork, the result of which is written
146 # back to the socket.
147 my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
148 print $socket $result, "\n.\n";
149 }
150 # this is where the socket is closed... so why am I eventually hitting a
151 # "cannot assign address" problem?
152 close $socket;
153 # add this socket number to the list of sockets to be destroyed
154 $closed_queue->enqueue($n);
155 }
156}
157
158sub receive_client_request
159{
160 my ($self, $socket)= @_;
161 # how does this construct work? eom_marker is a scalar (a string) so how can
162 # it be assigned to an anonymous array?
163 my ($eom, $buffer, $data)= $self->{eom_marker};
164 while ($buffer = <$socket>)
165 {
166 $data .= $buffer;
167 last if $data =~ s/$eom$//;
168 }
169 return $data;
170}
171
1721;
Note: See TracBrowser for help on using the repository browser.