source: gs2-extensions/tdb-edit/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm@ 26997

Last change on this file since 26997 was 26997, checked in by jmt12, 11 years ago

Attempting to make thread joining code compatiable between core perl and MCPAN perl with the slightly different threads module

File size: 5.8 KB
Line 
1# @author Donnie Cameron (macnod)
2# @url https://github.com/macnod/DcServer
3# @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool
4
5package SocketsSwimmingThreadPoolServer;
6
7use lib '.';
8use threads;
9use threads::shared;
10use Thread::Queue;
11use IO::Socket;
12use Time::HiRes qw/sleep/;
13use strict;
14use warnings;
15
16my $stop :shared;
17my $accept_queue = Thread::Queue->new;
18my $closed_queue = Thread::Queue->new;
19
20sub new {
21 # Params: host, port, thread_count, eom_marker, main_yield, main_cb,
22 # done_cb, processor_cb
23 my ($proto, %param)= @_;
24 my $class= ref($proto) || $proto;
25 bless +{
26 socket_defaults => +{
27 LocalHost => $param{host} || 'localhost',
28 LocalPort => $param{port} || 8191},
29 thread_count => $param{thread_count} || 10,
30 main_yield => $param{main_yield} || 5,
31 main_cb => $param{main_cb} || sub {},
32 done_cb => $param{done_cb} || sub {},
33 processor_cb => $param{processor_cb} || \&processor,
34 eom_marker => $param{eom_marker} || "\\n\\.\\n",
35 thread_pool => undef,
36 listen_queue => 5
37 } => $class;
38}
39
40# This callback (for processor_cb) simply explains no other processor function
41# defined.
42sub processor
43{
44 my ($data, $ip, $tid, $fnstop)= @_;
45 return "[tid=$tid; ip=$ip] No function implemented";
46}
47
48sub start
49{
50 my $self= shift;
51
52 # Start a thread to dispatch incoming requests
53 threads->create(sub {$self->accept_requests})->detach;
54
55 # Start the thread pool to handle dispatched requests
56 for (1 .. $self->{thread_count})
57 {
58 threads->create(sub {$self->request_handler})->detach;
59 }
60
61 # Start a loop for performing tasks in the background, while
62 # handling requests
63 $self->main_loop;
64
65 $self->{done_cb}->();
66}
67
68sub stop
69{
70 my $self= shift;
71 $stop= 1;
72}
73
74sub main_loop
75{
76 my $self = shift;
77 my $counter = 1;
78 until($stop)
79 {
80 $self->{main_cb}->($counter++, sub {$self->stop});
81 sleep $self->{main_yield};
82 }
83}
84
85sub accept_requests
86{
87 my $self = shift;
88 my %socket;
89 # lsocket => listerner [sic] socket
90 my $lsocket;
91 while (!defined $lsocket || !$lsocket)
92 {
93 $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1);
94 if (!defined $lsocket || !$lsocket)
95 {
96 print STDERR "Error! Can't create listerner socket. Server can't start. Trying again...\n$!.";
97 sleep(1);
98 }
99 }
100 until($stop)
101 {
102 # csocket => connected socket (for doing the actual communications)
103 my $csocket = $lsocket->accept;
104 my $n = fileno $csocket;
105 $socket{$n} = $csocket;
106 # add this to the queue that a virtual horde of request_handlers are
107 # waiting upon (much like vultures)
108 $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
109 # we then go ahead and destroy any sockets that we are no longer using.
110 # Note the use on a non-blocking dequeue (so it returns straight away) and
111 # that $n may be undefined (if the queue is empty) skipping the while loop
112 # altogether
113 while($n = $closed_queue->dequeue_nb)
114 {
115 # note: 2 = finished both reading and writing to this socket
116 $socket{$n}->shutdown(2);
117 delete $socket{$n}
118 }
119 }
120 # note: 2 = finished both reading and writing to this socket
121 $lsocket->shutdown(2);
122}
123
124# @function request_handler()
125#
126# There are *thread_count* request_handler threads all sitting, waiting, to
127# process any incoming reqeuests received via the socket. Note that the whole
128# 'race for data' issue is being handled by the Thread::Queue->dequeue command
129# which is, according to the documentation, thread safe. I presume the actual
130# dequeuing is somehow synchronized.
131sub request_handler
132{
133 my $self= shift;
134 my ($n, $ip, $data);
135 my ($receive_time, $process_time, $send_time);
136 until($stop)
137 {
138 # note: dequeue is blocking so this thread will wait here until something
139 # is enqueued
140 ($n, $ip)= split / /, $accept_queue->dequeue();
141 next unless $n;
142 # open socket for reading and writing... reading first of course
143 open my $socket, '+<&=' . $n or die $!;
144 # this will read in the payload from the socket until the end of message
145 # sentinel value is detected. Note that this is done in a loop, so the
146 # client side of the socket may send multiple requests. Eventually the
147 # client will close their end of the socket causing the data to be
148 # undefined (maybe - I don't quite understand the defining construct - see
149 # below) and breaking out of the loop.
150 if(defined($data = $self->receive_client_request($socket)))
151 {
152 # we then pass the payload to the registered processor function (provided
153 # by the caller) to do the actual legwork, the result of which is written
154 # back to the socket.
155 my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
156 print $socket $result, "\n.\n";
157 }
158 # this is where the socket is closed... so why am I eventually hitting a
159 # "cannot assign address" problem?
160 close $socket;
161 # add this socket number to the list of sockets to be destroyed
162 $closed_queue->enqueue($n);
163 }
164 # Properly rejoin thread
165 # Newer versions of module thread
166 if (defined $self->can('is_joinable'))
167 {
168 print "[debug] using newer thread->is_joinable... ";
169 if ($self->is_joinable())
170 {
171 $self->join();
172 print "joined\n";
173 }
174 else
175 {
176 print "not joinable\n";
177 }
178 }
179 else
180 {
181 print "[debug] using newer thread::is_detached... ";
182 my $thread = threads->self();
183 if (!$thread->is_detached())
184 {
185 $thread->join();
186 print "joined\n";
187 }
188 else
189 {
190 print "not detached\n";
191 }
192 }
193}
194
195sub receive_client_request
196{
197 my ($self, $socket)= @_;
198 # how does this construct work? eom_marker is a scalar (a string) so how can
199 # it be assigned to an anonymous array?
200 my ($eom, $buffer, $data)= $self->{eom_marker};
201 while ($buffer = <$socket>)
202 {
203 $data .= $buffer;
204 last if $data =~ s/$eom$//;
205 }
206 return $data;
207}
208
2091;
Note: See TracBrowser for help on using the repository browser.