Changeset 24840 for gs2-extensions/parallel-building/trunk
- Timestamp:
- 2011-12-01T12:25:06+13:00 (12 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
gs2-extensions/parallel-building/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm
r24671 r24840 33 33 processor_cb => $param{processor_cb} || \&processor, 34 34 eom_marker => $param{eom_marker} || "\\n\\.\\n", 35 thread_pool => undef 35 thread_pool => undef, 36 listen_queue => 5 36 37 } => $class; 37 38 } … … 71 72 } 72 73 73 sub main_loop { 74 my $self= shift; 75 my $counter= 1; 76 until($stop) { 74 sub main_loop 75 { 76 my $self = shift; 77 my $counter = 1; 78 until($stop) 79 { 77 80 $self->{main_cb}->($counter++, sub {$self->stop}); 78 81 sleep $self->{main_yield}; … … 80 83 } 81 84 82 sub accept_requests { 83 my $self= shift; 84 my ($csocket, $n, %socket); 85 my $lsocket= new IO::Socket::INET( 86 %{$self->{socket_defaults}}, 87 Proto => 'tcp', 88 Listen => 1, 89 Reuse => 1); 85 sub accept_requests 86 { 87 my $self = shift; 88 my %socket; 89 # lsocket => listerner [sic] socket 90 my $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1); 90 91 die "Can't create listerner socket. Server can't start. $!." unless $lsocket; 91 until($stop) { 92 $csocket= $lsocket->accept; 93 $n= fileno $csocket; 94 $socket{$n}= $csocket; 92 until($stop) 93 { 94 # csocket => connected socket (for doing the actual communications) 95 my $csocket = $lsocket->accept; 96 my $n = fileno $csocket; 97 $socket{$n} = $csocket; 98 # add this to the queue that a virtual horde of request_handlers are 99 # waiting upon (much like vultures) 95 100 $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr)); 96 while($n= $closed_queue->dequeue_nb) { 101 # we then go ahead and destroy any sockets that we are no longer using. 102 # Note the use on a non-blocking dequeue (so it returns straight away) and 103 # that $n may be undefined (if the queue is empty) skipping the while loop 104 # altogether 105 while($n = $closed_queue->dequeue_nb) 106 { 107 # note: 2 = finished both reading and writing to this socket 97 108 $socket{$n}->shutdown(2); 98 delete $socket{$n}}} 109 delete $socket{$n} 110 } 111 } 112 # note: 2 = finished both reading and writing to this socket 99 113 $lsocket->shutdown(2); 100 114 } 101 115 116 # @function request_handler() 117 # 118 # There are *thread_count* request_handler threads all sitting, waiting, to 119 # process any incoming reqeuests received via the socket. Note that the whole 120 # 'race for data' issue is being handled by the Thread::Queue->dequeue command 121 # which is, according to the documentation, thread safe. I presume the actual 122 # dequeuing is somehow synchronized. 102 123 sub request_handler 103 124 { … … 107 128 until($stop) 108 129 { 109 ($n, $ip)= split / /, $accept_queue->dequeue; 130 # note: dequeue is blocking so this thread will wait here until something 131 # is enqueued 132 ($n, $ip)= split / /, $accept_queue->dequeue(); 110 133 next unless $n; 134 # open socket for reading and writing... reading first of course 111 135 open my $socket, '+<&=' . $n or die $!; 136 # this will read in the payload from the socket until the end of message 137 # sentinel value is detected. Note that this is done in a loop, so the 138 # client side of the socket may send multiple requests. Eventually the 139 # client will close their end of the socket causing the data to be 140 # undefined (maybe - I don't quite understand the defining construct - see 141 # below) and breaking out of the loop. 112 142 if(defined($data = $self->receive_client_request($socket))) 113 143 { 144 # we then pass the payload to the registered processor function (provided 145 # by the caller) to do the actual legwork, the result of which is written 146 # back to the socket. 114 147 my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop}); 115 148 print $socket $result, "\n.\n"; 116 149 } 150 # this is where the socket is closed... so why am I eventually hitting a 151 # "cannot assign address" problem? 117 152 close $socket; 153 # add this socket number to the list of sockets to be destroyed 118 154 $closed_queue->enqueue($n); 119 155 } … … 123 159 { 124 160 my ($self, $socket)= @_; 161 # how does this construct work? eom_marker is a scalar (a string) so how can 162 # it be assigned to an anonymous array? 125 163 my ($eom, $buffer, $data)= $self->{eom_marker}; 126 while ($buffer= <$socket>)164 while ($buffer = <$socket>) 127 165 { 128 $data .= $buffer;166 $data .= $buffer; 129 167 last if $data =~ s/$eom$//; 130 168 }
Note:
See TracChangeset
for help on using the changeset viewer.