source: gs2-extensions/tdb-edit/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm@ 28776

Last change on this file since 28776 was 28776, checked in by jmt12, 10 years ago

opps.. left one join behind. fixed now, and added comment about the lingering problem of the main thread exiting while child threads still exist (prompting the 'a thread exited while X threads were still running' warning message)

File size: 6.0 KB
Line 
1# @author Donnie Cameron (macnod)
2# @url https://github.com/macnod/DcServer
3# @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool
4
5# One lurking issue with the ThreadPoolServer is that several threads don't get
6# a chance to return before the server shuts down. In a perfect world they'd
7# all get a chance to notice the 'stop' flag has been set and thusly
8# stop. However, it is only by chance this happens as most will spend most of
9# their lives either blocked listening to a socket (accept_requests) or blocked
10# waiting to dequeue a message (request_handlers). Possible solutions, such as
11# interupts, have problems of their own - interrupting mid-read of a socket
12# could be bad. Since this is non-fatal, and non-trivial to fix, I'm just
13# leaving it for now.
14
15package SocketsSwimmingThreadPoolServer;
16
17use lib '.';
18use threads;
19use threads::shared;
20use Thread::Queue;
21use IO::Socket;
22use Time::HiRes qw/sleep/;
23use strict;
24use warnings;
25
26my $stop :shared;
27my $accept_queue = Thread::Queue->new;
28my $closed_queue = Thread::Queue->new;
29
30sub new {
31 # Params: host, port, thread_count, eom_marker, main_yield, main_cb,
32 # done_cb, processor_cb
33 my ($proto, %param)= @_;
34 my $class= ref($proto) || $proto;
35 bless +{
36 socket_defaults => +{
37 LocalHost => $param{host} || 'localhost',
38 LocalPort => $param{port} || 8191},
39 thread_count => $param{thread_count} || 10,
40 main_yield => $param{main_yield} || 1,
41 main_cb => $param{main_cb} || sub {},
42 done_cb => $param{done_cb} || sub {},
43 processor_cb => $param{processor_cb} || \&processor,
44 eom_marker => $param{eom_marker} || "\\n\\.\\n",
45 thread_pool => undef,
46 listen_queue => 5
47 } => $class;
48}
49
50# This callback (for processor_cb) simply explains no other processor function
51# defined.
52sub processor
53{
54 my ($data, $ip, $tid, $fnstop)= @_;
55 return "[tid=$tid; ip=$ip] No function implemented";
56}
57
58sub start
59{
60 my $self= shift;
61
62 # Start a thread to dispatch incoming requests
63 threads->create(sub {$self->accept_requests})->detach;
64
65 # Start the thread pool to handle dispatched requests
66 for (1 .. $self->{thread_count})
67 {
68 threads->create(sub {$self->request_handler})->detach;
69 }
70
71 # Start a loop for performing tasks in the background, while
72 # handling requests
73 $self->main_loop;
74
75 $self->{done_cb}->();
76}
77
78sub stop
79{
80 my $self = shift;
81 $stop= 1;
82}
83
84sub main_loop
85{
86 my $self = shift;
87 my $counter = 1;
88 until($stop)
89 {
90 $self->{main_cb}->($counter++, sub {$self->stop});
91 sleep $self->{main_yield};
92 }
93}
94
95sub accept_requests
96{
97 my $self = shift;
98 my %socket;
99 # lsocket => listerner [sic] socket
100 my $lsocket;
101 while (!defined $lsocket || !$lsocket)
102 {
103 $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1);
104 if (!defined $lsocket || !$lsocket)
105 {
106 print STDERR "Warning! Can't create listener socket so server can't start (trying again in 1 second).\n";
107 print STDERR "Error message: \"" . $! . "\"\n\n";
108 sleep(1);
109 }
110 }
111 until($stop)
112 {
113 # csocket => connected socket (for doing the actual communications)
114 my $csocket = $lsocket->accept;
115 my $n = fileno $csocket;
116 $socket{$n} = $csocket;
117 # add this to the queue that a virtual horde of request_handlers are
118 # waiting upon (much like vultures)
119 $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
120 # we then go ahead and destroy any sockets that we are no longer using.
121 # Note the use on a non-blocking dequeue (so it returns straight away) and
122 # that $n may be undefined (if the queue is empty) skipping the while loop
123 # altogether
124 while($n = $closed_queue->dequeue_nb)
125 {
126 # note: 2 = finished both reading and writing to this socket
127 $socket{$n}->shutdown(2);
128 delete $socket{$n}
129 }
130 }
131 # note: 2 = finished both reading and writing to this socket
132 $lsocket->shutdown(2);
133}
134
135# @function request_handler()
136#
137# There are *thread_count* request_handler threads all sitting, waiting, to
138# process any incoming requests received via the socket. Note that the whole
139# 'race for data' issue is being handled by the Thread::Queue->dequeue command
140# which is, according to the documentation, thread safe. I presume the actual
141# dequeuing is somehow synchronized.
142sub request_handler
143{
144 my $self= shift;
145 my ($n, $ip, $data);
146 my ($receive_time, $process_time, $send_time);
147 until($stop)
148 {
149 # note: dequeue is blocking so this thread will wait here until something
150 # is enqueued
151 ($n, $ip)= split / /, $accept_queue->dequeue();
152 next unless $n;
153 # open socket for reading and writing... reading first of course
154 open my $socket, '+<&=' . $n or die $!;
155 # this will read in the payload from the socket until the end of message
156 # sentinel value is detected. Note that this is done in a loop, so the
157 # client side of the socket may send multiple requests. Eventually the
158 # client will close their end of the socket causing the data to be
159 # undefined (maybe - I don't quite understand the defining construct - see
160 # below) and breaking out of the loop.
161 if(defined($data = $self->receive_client_request($socket)))
162 {
163 # we then pass the payload to the registered processor function (provided
164 # by the caller) to do the actual legwork, the result of which is written
165 # back to the socket.
166 my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
167 print $socket $result, "\n.\n";
168 }
169 # this is where the socket is closed... so why am I eventually hitting a
170 # "cannot assign address" problem?
171 close $socket;
172 # add this socket number to the list of sockets to be destroyed
173 $closed_queue->enqueue($n);
174 }
175}
176
177sub receive_client_request
178{
179 my ($self, $socket)= @_;
180 # how does this construct work? eom_marker is a scalar (a string) so how can
181 # it be assigned to an anonymous array?
182 my ($eom, $buffer, $data)= $self->{eom_marker};
183 while ($buffer = <$socket>)
184 {
185 $data .= $buffer;
186 last if $data =~ s/$eom$//;
187 }
188 return $data;
189}
190
1911;
Note: See TracBrowser for help on using the repository browser.