Changeset 24840


Ignore:
Timestamp:
2011-12-01T12:25:06+13:00 (12 years ago)
Author:
jmt12
Message:

Added lots of comments as I was trying to determine the cause of the exhausted socket pool problem

File:
1 edited

Legend:

Unmodified
Added
Removed
  • gs2-extensions/parallel-building/trunk/src/perllib/SocketsSwimmingThreadPoolServer.pm

    r24671 r24840  
    3333    processor_cb => $param{processor_cb} || \&processor,
    3434    eom_marker => $param{eom_marker} || "\\n\\.\\n",
    35     thread_pool => undef
     35    thread_pool => undef,
     36    listen_queue => 5
    3637  } => $class;
    3738}
     
    7172}
    7273
    73 sub main_loop {
    74   my $self= shift;
    75   my $counter= 1;
    76   until($stop) {
     74sub main_loop
     75{
     76  my $self = shift;
     77  my $counter = 1;
     78  until($stop)
     79  {
    7780    $self->{main_cb}->($counter++, sub {$self->stop});
    7881    sleep $self->{main_yield};
     
    8083}
    8184
    82 sub accept_requests {
    83   my $self= shift;
    84   my ($csocket, $n, %socket);
    85   my $lsocket= new IO::Socket::INET(
    86     %{$self->{socket_defaults}},
    87     Proto => 'tcp',
    88     Listen => 1,
    89     Reuse => 1);
     85sub accept_requests
     86{
     87  my $self = shift;
     88  my %socket;
     89  # lsocket => listerner [sic] socket
     90  my $lsocket = new IO::Socket::INET(%{$self->{socket_defaults}}, Proto => 'tcp', Listen => 1, Reuse => 1);
    9091  die "Can't create listerner socket. Server can't start. $!." unless $lsocket;
    91   until($stop) {
    92     $csocket= $lsocket->accept;
    93     $n= fileno $csocket;
    94     $socket{$n}= $csocket;
     92  until($stop)
     93  {
     94    # csocket => connected socket (for doing the actual communications)
     95    my $csocket = $lsocket->accept;
     96    my $n = fileno $csocket;
     97    $socket{$n} = $csocket;
     98    # add this to the queue that a virtual horde of request_handlers are
     99    # waiting upon (much like vultures)
    95100    $accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
    96     while($n= $closed_queue->dequeue_nb) {
     101    # we then go ahead and destroy any sockets that we are no longer using.
     102    # Note the use on a non-blocking dequeue (so it returns straight away) and
     103    # that $n may be undefined (if the queue is empty) skipping the while loop
     104    # altogether
     105    while($n = $closed_queue->dequeue_nb)
     106    {
     107      # note: 2 = finished both reading and writing to this socket
    97108      $socket{$n}->shutdown(2);
    98       delete $socket{$n}}}
     109      delete $socket{$n}
     110    }
     111  }
     112  # note: 2 = finished both reading and writing to this socket
    99113  $lsocket->shutdown(2);
    100114}
    101115
     116# @function request_handler()
     117#
     118# There are *thread_count* request_handler threads all sitting, waiting, to
     119# process any incoming reqeuests received via the socket. Note that the whole
     120# 'race for data' issue is being handled by the Thread::Queue->dequeue command
     121# which is, according to the documentation, thread safe. I presume the actual
     122# dequeuing is somehow synchronized.
    102123sub request_handler
    103124{
     
    107128  until($stop)
    108129  {
    109     ($n, $ip)= split / /, $accept_queue->dequeue;
     130    # note: dequeue is blocking so this thread will wait here until something
     131    # is enqueued
     132    ($n, $ip)= split / /, $accept_queue->dequeue();
    110133    next unless $n;
     134    # open socket for reading and writing... reading first of course
    111135    open my $socket, '+<&=' . $n or die $!;
     136    # this will read in the payload from the socket until the end of message
     137    # sentinel value is detected. Note that this is done in a loop, so the
     138    # client side of the socket may send multiple requests. Eventually the
     139    # client will close their end of the socket causing the data to be
     140    # undefined (maybe - I don't quite understand the defining construct - see
     141    # below) and breaking out of the loop.
    112142    if(defined($data = $self->receive_client_request($socket)))
    113143    {
     144      # we then pass the payload to the registered processor function (provided
     145      # by the caller) to do the actual legwork, the result of which is written
     146      # back to the socket.
    114147      my $result = $self->{processor_cb}->($data, $ip, threads->tid, sub {$self->stop});
    115148      print $socket $result, "\n.\n";
    116149    }
     150    # this is where the socket is closed... so why am I eventually hitting a
     151    # "cannot assign address" problem?
    117152    close $socket;
     153    # add this socket number to the list of sockets to be destroyed
    118154    $closed_queue->enqueue($n);
    119155  }
     
    123159{
    124160  my ($self, $socket)= @_;
     161  # how does this construct work? eom_marker is a scalar (a string) so how can
     162  # it be assigned to an anonymous array?
    125163  my ($eom, $buffer, $data)= $self->{eom_marker};
    126   while($buffer= <$socket>)
     164  while ($buffer = <$socket>)
    127165  {
    128     $data.= $buffer;
     166    $data .= $buffer;
    129167    last if $data =~ s/$eom$//;
    130168  }
Note: See TracChangeset for help on using the changeset viewer.