source: gs2-extensions/parallel-building/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm@ 24671

Last change on this file since 24671 was 24671, checked in by jmt12, 13 years ago

Simple server and client objects that are based around a socket that accepts requests then spawns off threads to action them. I didn't write these - they were downloaded from github - and I'm unsure of the licensing (but assume GPL)

File size: 3.1 KB
Line 
1# @author Donnie Cameron (macnod)
2# @url https://github.com/macnod/DcServer
3# @readme http://donnieknows.com/blog/perl-sockets-swimming-thread-pool
4
5package SocketsSwimmingThreadPoolServer;
6
7use lib '.';
8use threads;
9use threads::shared;
10use Thread::Queue;
11use IO::Socket;
12use Time::HiRes qw/sleep/;
13use strict;
14use warnings;
15
16my $stop :shared;
17my $accept_queue = Thread::Queue->new;
18my $closed_queue = Thread::Queue->new;
19
20sub new {
21 # Params: host, port, thread_count, eom_marker, main_yield, main_cb,
22 # done_cb, processor_cb
23 my ($proto, %param)= @_;
24 my $class= ref($proto) || $proto;
25 bless +{
26 socket_defaults => +{
27 LocalHost => $param{host} || 'localhost',
28 LocalPort => $param{port} || 8191},
29 thread_count => $param{thread_count} || 10,
30 main_yield => $param{main_yield} || 5,
31 main_cb => $param{main_cb} || sub {},
32 done_cb => $param{done_cb} || sub {},
33 processor_cb => $param{processor_cb} || \&processor,
34 eom_marker => $param{eom_marker} || "\\n\\.\\n",
35 thread_pool => undef
36 } => $class;
37}
38
39# This callback (for processor_cb) simply explains no other processor function
40# defined.
41sub processor
42{
43 my ($data, $ip, $tid, $fnstop)= @_;
44 return "[tid=$tid; ip=$ip] No function implemented";
45}
46
47sub start
48{
49 my $self= shift;
50
51 # Start a thread to dispatch incoming requests
52 threads->create(sub {$self->accept_requests})->detach;
53
54 # Start the thread pool to handle dispatched requests
55 for (1 .. $self->{thread_count})
56 {
57 threads->create(sub {$self->request_handler})->detach;
58 }
59
60 # Start a loop for performing tasks in the background, while
61 # handling requests
62 $self->main_loop;
63
64 $self->{done_cb}->();
65}
66
67sub stop
68{
69 my $self= shift;
70 $stop= 1;
71}
72
73sub main_loop {
74 my $self= shift;
75 my $counter= 1;
76 until($stop) {
77 $self->{main_cb}->($counter++, sub {$self->stop});
78 sleep $self->{main_yield};
79 }
80}
81
82sub accept_requests {
83 my $self= shift;
84 my ($csocket, $n, %socket);
85 my $lsocket= new IO::Socket::INET(
86 %{$self->{socket_defaults}},
87 Proto => 'tcp',
88 Listen => 1,
89 Reuse => 1);
90 die "Can't create listerner socket. Server can't start. $!." unless $lsocket;
91 until($stop) {
92 $csocket= $lsocket->accept;
93 $n= fileno $csocket;
94 $socket{$n}= $csocket;
95 $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
96 while($n= $closed_queue->dequeue_nb) {
97 $socket{$n}->shutdown(2);
98 delete $socket{$n}}}
99 $lsocket->shutdown(2);
100}
101
102sub request_handler
103{
104 my $self= shift;
105 my ($n, $ip, $data);
106 my ($receive_time, $process_time, $send_time);
107 until($stop)
108 {
109 ($n, $ip)= split / /, $accept_queue->dequeue;
110 next unless $n;
111 open my $socket, '+<&=' . $n or die $!;
112 if(defined($data = $self->receive_client_request($socket)))
113 {
114 my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
115 print $socket $result, "\n.\n";
116 }
117 close $socket;
118 $closed_queue->enqueue($n);
119 }
120}
121
122sub receive_client_request
123{
124 my ($self, $socket)= @_;
125 my ($eom, $buffer, $data)= $self->{eom_marker};
126 while($buffer= <$socket>)
127 {
128 $data.= $buffer;
129 last if $data =~ s/$eom$//;
130 }
131 return $data;
132}
133
1341;
Note: See TracBrowser for help on using the repository browser.