source: gs2-extensions/tdb-edit/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm@ 28775

Last change on this file since 28775 was 28775, checked in by jmt12, 10 years ago

Apparently trying to join() threads that have been detached() is actually wrong (detach means 'run this, I don't care about result' while join implies 'okay, wait until this is done and then return result'). Since I don't care about result values (they're always 1) I'll just remove the joining code since it doesn't seem to be portable to Medusa anyway

File size: 6.0 KB
Line 
1# @author Donnie Cameron (macnod)
2# @url https://github.com/macnod/DcServer
3# @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool
4
5package SocketsSwimmingThreadPoolServer;
6
7use lib '.';
8use threads;
9use threads::shared;
10use Thread::Queue;
11use IO::Socket;
12use Time::HiRes qw/sleep/;
13use strict;
14use warnings;
15
16my $stop :shared;
17my $accept_queue = Thread::Queue->new;
18my $closed_queue = Thread::Queue->new;
19
20sub new {
21 # Params: host, port, thread_count, eom_marker, main_yield, main_cb,
22 # done_cb, processor_cb
23 my ($proto, %param)= @_;
24 my $class= ref($proto) || $proto;
25 bless +{
26 socket_defaults => +{
27 LocalHost => $param{host} || 'localhost',
28 LocalPort => $param{port} || 8191},
29 thread_count => $param{thread_count} || 10,
30 main_yield => $param{main_yield} || 5,
31 main_cb => $param{main_cb} || sub {},
32 done_cb => $param{done_cb} || sub {},
33 processor_cb => $param{processor_cb} || \&processor,
34 eom_marker => $param{eom_marker} || "\\n\\.\\n",
35 thread_pool => undef,
36 listen_queue => 5
37 } => $class;
38}
39
40# This callback (for processor_cb) simply explains no other processor function
41# defined.
42sub processor
43{
44 my ($data, $ip, $tid, $fnstop)= @_;
45 return "[tid=$tid; ip=$ip] No function implemented";
46}
47
48sub start
49{
50 my $self= shift;
51
52 # Start a thread to dispatch incoming requests
53 threads->create(sub {$self->accept_requests})->detach;
54
55 # Start the thread pool to handle dispatched requests
56 for (1 .. $self->{thread_count})
57 {
58 threads->create(sub {$self->request_handler})->detach;
59 }
60
61 # Start a loop for performing tasks in the background, while
62 # handling requests
63 $self->main_loop;
64
65 $self->{done_cb}->();
66}
67
68sub stop
69{
70 my $self= shift;
71 $stop= 1;
72}
73
74sub main_loop
75{
76 my $self = shift;
77 my $counter = 1;
78 until($stop)
79 {
80 $self->{main_cb}->($counter++, sub {$self->stop});
81 sleep $self->{main_yield};
82 }
83}
84
85sub accept_requests
86{
87 my $self = shift;
88 my %socket;
89 # lsocket => listerner [sic] socket
90 my $lsocket;
91 while (!defined $lsocket || !$lsocket)
92 {
93 $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1);
94 if (!defined $lsocket || !$lsocket)
95 {
96 print STDERR "Error! Can't create listerner socket so server can't start (trying again in 1 second). Error message: \"" . $! . "\"\n";
97 sleep(1);
98 }
99 }
100 until($stop)
101 {
102 # csocket => connected socket (for doing the actual communications)
103 my $csocket = $lsocket->accept;
104 my $n = fileno $csocket;
105 $socket{$n} = $csocket;
106 # add this to the queue that a virtual horde of request_handlers are
107 # waiting upon (much like vultures)
108 $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
109 # we then go ahead and destroy any sockets that we are no longer using.
110 # Note the use on a non-blocking dequeue (so it returns straight away) and
111 # that $n may be undefined (if the queue is empty) skipping the while loop
112 # altogether
113 while($n = $closed_queue->dequeue_nb)
114 {
115 # note: 2 = finished both reading and writing to this socket
116 $socket{$n}->shutdown(2);
117 delete $socket{$n}
118 }
119 }
120 # note: 2 = finished both reading and writing to this socket
121 $lsocket->shutdown(2);
122}
123
124# @function request_handler()
125#
126# There are *thread_count* request_handler threads all sitting, waiting, to
127# process any incoming requests received via the socket. Note that the whole
128# 'race for data' issue is being handled by the Thread::Queue->dequeue command
129# which is, according to the documentation, thread safe. I presume the actual
130# dequeuing is somehow synchronized.
131sub request_handler
132{
133 my $self= shift;
134 my ($n, $ip, $data);
135 my ($receive_time, $process_time, $send_time);
136 until($stop)
137 {
138 # note: dequeue is blocking so this thread will wait here until something
139 # is enqueued
140 ($n, $ip)= split / /, $accept_queue->dequeue();
141 next unless $n;
142 # open socket for reading and writing... reading first of course
143 open my $socket, '+<&=' . $n or die $!;
144 # this will read in the payload from the socket until the end of message
145 # sentinel value is detected. Note that this is done in a loop, so the
146 # client side of the socket may send multiple requests. Eventually the
147 # client will close their end of the socket causing the data to be
148 # undefined (maybe - I don't quite understand the defining construct - see
149 # below) and breaking out of the loop.
150 if(defined($data = $self->receive_client_request($socket)))
151 {
152 # we then pass the payload to the registered processor function (provided
153 # by the caller) to do the actual legwork, the result of which is written
154 # back to the socket.
155 my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
156 print $socket $result, "\n.\n";
157 }
158 # this is where the socket is closed... so why am I eventually hitting a
159 # "cannot assign address" problem?
160 close $socket;
161 # add this socket number to the list of sockets to be destroyed
162 $closed_queue->enqueue($n);
163 }
164
165 # Shouldn't join() detached threads?
166 # Properly rejoin thread
167 # Newer versions of module thread
168 #if (defined $self->can('is_joinable'))
169 #{
170 # print "[debug] using newer thread->is_joinable... ";
171 # if ($self->is_joinable())
172 # {
173 # $self->join();
174 # print "joined\n";
175 # }
176 # else
177 # {
178 # print "not joinable\n";
179 # }
180 #}
181 #else
182 #{
183 # print "[debug] using newer thread->is_detached... ";
184 # my $thread = threads->self();
185 # if (!$thread->is_detached())
186 # {
187 # $thread->join();
188 # print "joined\n";
189 # }
190 # else
191 # {
192 # print "not detached\n";
193 # }
194 #}
195}
196
197sub receive_client_request
198{
199 my ($self, $socket)= @_;
200 # how does this construct work? eom_marker is a scalar (a string) so how can
201 # it be assigned to an anonymous array?
202 my ($eom, $buffer, $data)= $self->{eom_marker};
203 while ($buffer = <$socket>)
204 {
205 $data .= $buffer;
206 last if $data =~ s/$eom$//;
207 }
208 return $data;
209}
210
2111;
Note: See TracBrowser for help on using the repository browser.