# @author Donnie Cameron (macnod) # @url https://github.com/macnod/DcServer # @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool package SocketsSwimmingThreadPoolServer; use lib '.'; use threads; use threads::shared; use Thread::Queue; use IO::Socket; use Time::HiRes qw/sleep/; use strict; use warnings; my $stop :shared; my $accept_queue = Thread::Queue->new; my $closed_queue = Thread::Queue->new; sub new { # Params: host, port, thread_count, eom_marker, main_yield, main_cb, # done_cb, processor_cb my ($proto, %param)= @_; my $class= ref($proto) || $proto; bless +{ socket_defaults => +{ LocalHost => $param{host} || 'localhost', LocalPort => $param{port} || 8191}, thread_count => $param{thread_count} || 10, main_yield => $param{main_yield} || 5, main_cb => $param{main_cb} || sub {}, done_cb => $param{done_cb} || sub {}, processor_cb => $param{processor_cb} || \&processor, eom_marker => $param{eom_marker} || "\\n\\.\\n", thread_pool => undef, listen_queue => 5 } => $class; } # This callback (for processor_cb) simply explains no other processor function # defined. sub processor { my ($data, $ip, $tid, $fnstop)= @_; return "[tid=$tid; ip=$ip] No function implemented"; } sub start { my $self= shift; # Start a thread to dispatch incoming requests threads->create(sub {$self->accept_requests})->detach; # Start the thread pool to handle dispatched requests for (1 .. $self->{thread_count}) { threads->create(sub {$self->request_handler})->detach; } # Start a loop for performing tasks in the background, while # handling requests $self->main_loop; $self->{done_cb}->(); } sub stop { my $self= shift; $stop= 1; } sub main_loop { my $self = shift; my $counter = 1; until($stop) { $self->{main_cb}->($counter++, sub {$self->stop}); sleep $self->{main_yield}; } } sub accept_requests { my $self = shift; my %socket; # lsocket => listerner [sic] socket my $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1); die "Can't create listerner socket. Server can't start. $!." unless $lsocket; until($stop) { # csocket => connected socket (for doing the actual communications) my $csocket = $lsocket->accept; my $n = fileno $csocket; $socket{$n} = $csocket; # add this to the queue that a virtual horde of request_handlers are # waiting upon (much like vultures) $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr)); # we then go ahead and destroy any sockets that we are no longer using. # Note the use on a non-blocking dequeue (so it returns straight away) and # that $n may be undefined (if the queue is empty) skipping the while loop # altogether while($n = $closed_queue->dequeue_nb) { # note: 2 = finished both reading and writing to this socket $socket{$n}->shutdown(2); delete $socket{$n} } } # note: 2 = finished both reading and writing to this socket $lsocket->shutdown(2); } # @function request_handler() # # There are *thread_count* request_handler threads all sitting, waiting, to # process any incoming reqeuests received via the socket. Note that the whole # 'race for data' issue is being handled by the Thread::Queue->dequeue command # which is, according to the documentation, thread safe. I presume the actual # dequeuing is somehow synchronized. sub request_handler { my $self= shift; my ($n, $ip, $data); my ($receive_time, $process_time, $send_time); until($stop) { # note: dequeue is blocking so this thread will wait here until something # is enqueued ($n, $ip)= split / /, $accept_queue->dequeue(); next unless $n; # open socket for reading and writing... reading first of course open my $socket, '+<&=' . $n or die $!; # this will read in the payload from the socket until the end of message # sentinel value is detected. Note that this is done in a loop, so the # client side of the socket may send multiple requests. Eventually the # client will close their end of the socket causing the data to be # undefined (maybe - I don't quite understand the defining construct - see # below) and breaking out of the loop. if(defined($data = $self->receive_client_request($socket))) { # we then pass the payload to the registered processor function (provided # by the caller) to do the actual legwork, the result of which is written # back to the socket. my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop}); print $socket $result, "\n.\n"; } # this is where the socket is closed... so why am I eventually hitting a # "cannot assign address" problem? close $socket; # add this socket number to the list of sockets to be destroyed $closed_queue->enqueue($n); } } sub receive_client_request { my ($self, $socket)= @_; # how does this construct work? eom_marker is a scalar (a string) so how can # it be assigned to an anonymous array? my ($eom, $buffer, $data)= $self->{eom_marker}; while ($buffer = <$socket>) { $data .= $buffer; last if $data =~ s/$eom$//; } return $data; } 1;