source: gs2-extensions/tdb-edit/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm@ 27403

Last change on this file since 27403 was 27403, checked in by jmt12, 11 years ago

just fixing a typo

File size: 5.8 KB
Line 
1# @author Donnie Cameron (macnod)
2# @url https://github.com/macnod/DcServer
3# @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool
4
5package SocketsSwimmingThreadPoolServer;
6
7use lib '.';
8use threads;
9use threads::shared;
10use Thread::Queue;
11use IO::Socket;
12use Time::HiRes qw/sleep/;
13use strict;
14use warnings;
15
16my $stop :shared;
17my $accept_queue = Thread::Queue->new;
18my $closed_queue = Thread::Queue->new;
19
20sub new {
21 # Params: host, port, thread_count, eom_marker, main_yield, main_cb,
22 # done_cb, processor_cb
23 my ($proto, %param)= @_;
24 my $class= ref($proto) || $proto;
25 bless +{
26 socket_defaults => +{
27 LocalHost => $param{host} || 'localhost',
28 LocalPort => $param{port} || 8191},
29 thread_count => $param{thread_count} || 10,
30 main_yield => $param{main_yield} || 5,
31 main_cb => $param{main_cb} || sub {},
32 done_cb => $param{done_cb} || sub {},
33 processor_cb => $param{processor_cb} || \&processor,
34 eom_marker => $param{eom_marker} || "\\n\\.\\n",
35 thread_pool => undef,
36 listen_queue => 5
37 } => $class;
38}
39
40# This callback (for processor_cb) simply explains no other processor function
41# defined.
42sub processor
43{
44 my ($data, $ip, $tid, $fnstop)= @_;
45 return "[tid=$tid; ip=$ip] No function implemented";
46}
47
48sub start
49{
50 my $self= shift;
51
52 # Start a thread to dispatch incoming requests
53 threads->create(sub {$self->accept_requests})->detach;
54
55 # Start the thread pool to handle dispatched requests
56 for (1 .. $self->{thread_count})
57 {
58 threads->create(sub {$self->request_handler})->detach;
59 }
60
61 # Start a loop for performing tasks in the background, while
62 # handling requests
63 $self->main_loop;
64
65 $self->{done_cb}->();
66}
67
68sub stop
69{
70 my $self= shift;
71 $stop= 1;
72}
73
74sub main_loop
75{
76 my $self = shift;
77 my $counter = 1;
78 until($stop)
79 {
80 $self->{main_cb}->($counter++, sub {$self->stop});
81 sleep $self->{main_yield};
82 }
83}
84
85sub accept_requests
86{
87 my $self = shift;
88 my %socket;
89 # lsocket => listerner [sic] socket
90 my $lsocket;
91 while (!defined $lsocket || !$lsocket)
92 {
93 $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1);
94 if (!defined $lsocket || !$lsocket)
95 {
96 print STDERR "Error! Can't create listerner socket. Server can't start. Trying again...\n$!.";
97 sleep(1);
98 }
99 }
100 until($stop)
101 {
102 # csocket => connected socket (for doing the actual communications)
103 my $csocket = $lsocket->accept;
104 my $n = fileno $csocket;
105 $socket{$n} = $csocket;
106 # add this to the queue that a virtual horde of request_handlers are
107 # waiting upon (much like vultures)
108 $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
109 # we then go ahead and destroy any sockets that we are no longer using.
110 # Note the use on a non-blocking dequeue (so it returns straight away) and
111 # that $n may be undefined (if the queue is empty) skipping the while loop
112 # altogether
113 while($n = $closed_queue->dequeue_nb)
114 {
115 # note: 2 = finished both reading and writing to this socket
116 $socket{$n}->shutdown(2);
117 delete $socket{$n}
118 }
119 }
120 # note: 2 = finished both reading and writing to this socket
121 $lsocket->shutdown(2);
122}
123
124# @function request_handler()
125#
126# There are *thread_count* request_handler threads all sitting, waiting, to
127# process any incoming requests received via the socket. Note that the whole
128# 'race for data' issue is being handled by the Thread::Queue->dequeue command
129# which is, according to the documentation, thread safe. I presume the actual
130# dequeuing is somehow synchronized.
131sub request_handler
132{
133 my $self= shift;
134 my ($n, $ip, $data);
135 my ($receive_time, $process_time, $send_time);
136 until($stop)
137 {
138 # note: dequeue is blocking so this thread will wait here until something
139 # is enqueued
140 ($n, $ip)= split / /, $accept_queue->dequeue();
141 next unless $n;
142 # open socket for reading and writing... reading first of course
143 open my $socket, '+<&=' . $n or die $!;
144 # this will read in the payload from the socket until the end of message
145 # sentinel value is detected. Note that this is done in a loop, so the
146 # client side of the socket may send multiple requests. Eventually the
147 # client will close their end of the socket causing the data to be
148 # undefined (maybe - I don't quite understand the defining construct - see
149 # below) and breaking out of the loop.
150 if(defined($data = $self->receive_client_request($socket)))
151 {
152 # we then pass the payload to the registered processor function (provided
153 # by the caller) to do the actual legwork, the result of which is written
154 # back to the socket.
155 my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
156 print $socket $result, "\n.\n";
157 }
158 # this is where the socket is closed... so why am I eventually hitting a
159 # "cannot assign address" problem?
160 close $socket;
161 # add this socket number to the list of sockets to be destroyed
162 $closed_queue->enqueue($n);
163 }
164 # Properly rejoin thread
165 # Newer versions of module thread
166 if (defined $self->can('is_joinable'))
167 {
168 print "[debug] using newer thread->is_joinable... ";
169 if ($self->is_joinable())
170 {
171 $self->join();
172 print "joined\n";
173 }
174 else
175 {
176 print "not joinable\n";
177 }
178 }
179 else
180 {
181 print "[debug] using newer thread::is_detached... ";
182 my $thread = threads->self();
183 if (!$thread->is_detached())
184 {
185 $thread->join();
186 print "joined\n";
187 }
188 else
189 {
190 print "not detached\n";
191 }
192 }
193}
194
195sub receive_client_request
196{
197 my ($self, $socket)= @_;
198 # how does this construct work? eom_marker is a scalar (a string) so how can
199 # it be assigned to an anonymous array?
200 my ($eom, $buffer, $data)= $self->{eom_marker};
201 while ($buffer = <$socket>)
202 {
203 $data .= $buffer;
204 last if $data =~ s/$eom$//;
205 }
206 return $data;
207}
208
2091;
Note: See TracBrowser for help on using the repository browser.