Beefy Boxes and Bandwidth Generously Provided by pair Networks
more useful options
 
PerlMonks  

Re^2: multithreaded tcp listener with IO::Socket

by Random_Walk (Prior)
on May 15, 2006 at 12:22 UTC ( [id://549457]=note: print w/replies, xml ) Need Help??


in reply to Re: multithreaded tcp listener with IO::Socket
in thread multithreaded tcp listener with IO::Socket

Very good points. The SIGCHLD was left over from my first child spawning implementation. The failure to detach my threads and close my socket are my own coded last thing on a Friday errors. All now fixed.

Thanks a lot,
R.

Pereant, qui ante nos nostra dixerunt!
  • Comment on Re^2: multithreaded tcp listener with IO::Socket

Replies are listed 'Best First'.
Re^3: multithreaded tcp listener with IO::Socket
by BrowserUk (Patriarch) on May 15, 2006 at 12:51 UTC

    In general, I currently favour using a thread pool over creating a new thread for each client. Especially when each connection is so breif. One advantage of this is that it eliminates the major possible source of memory leaks (the thread creation/cloning/destruction cycle).

    With a duty cycle of a few milliseconds/300 seconds, unless all your clients are synchronised you would probably only need a pool of 2 or 3 threads/100 clients. Even if all your pool was busy when a new client connect occurs, with each communication being so breif, the new client will only have to wait a few milliseconds at most to be serviced.

    Modifying your program to use a thread pool would be slightly more complex, but not grossly so. I will try to post something later today by way of demonstration.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      hi BrowserUk,

      I tried using a thread pool but can not seem to pass the socket handle through the thread queue. Here is the error I get thrown by Thread::Queue. This happens as soon as the first client connects.

      Updated, see below
      C:\test>perl new_ecg.pl Server ready. Waiting for connections on 8000 ... Invalid value for shared scalar at C:/Perl/lib/Thread/Queue.pm line 90 +. A thread exited while 12 threads were running.
      Here is the code I am using
      #!/usr/bin/perl use strict; use warnings; use IO::Socket; use threads; use Thread::Queue; #get the port to bind to or default to 8000 my $port = $ARGV[0] || 8000; my $listeners = 10; # a hash to record client machines and thread queue for internal comun +ication my %clients; my $mqueue = Thread::Queue -> new; my $squeue = Thread::Queue -> new; threads->create ("monitor", $mqueue); for (1..$listeners) { threads->create ("read_data", $mqueue, $squeue)->detach; } #create the listen socket my $listen_socket = IO::Socket::INET->new(LocalPort => $port, Listen => 20, Proto => 'tcp', Reuse => 1); #make sure we are bound to the port die "Cant't create a listening socket: $@" unless $listen_socket; warn "Server ready. Waiting for connections on $port ... \n"; #wait for connections at the accept call while (my $connection = $listen_socket->accept) { # put the connection on the queue for a reader thread $squeue -> enqueue($connection); } sub read_data { # accept data from the socket and put it on the queue my ($mqueue, $squeue) = @_; while (my $socket = $squeue -> dequeue) { while (<$socket>) { print "listener got: $_"; $mqueue -> enqueue(time." $_"); } close $socket; } } sub monitor { my $mqueue = shift; while (1) { while ($mqueue -> pending) { my $data = $mqueue -> dequeue; print "monitor got: $data"; $data =~ /(\d+) Heartbeat from (\S+) next one in (\d+) min +utes/; my $time = $1; my $client = $2; my $cycle = $3; if (defined $clients{$client} and $clients{$client} -> [0] + eq 'NAK') { print "$client sent a beat again\n"; } $clients{$client} = [ 'OK', $time + $cycle * 60 ]; } for my $client (keys %clients) { next if $clients{$client}->[0] eq 'NAK'; next if $clients{$client}->[1] > time; print "$client missed a heartbeat, expected at $clients{$c +lient}->[1], now it is ".time."\n"; $clients{$client}->[0] = 'NAK'; } sleep 30; } }
      Update
      I had a search on that error and found Thread::Queue can only handle simple scalar values. I downloaded and installed Thread::Queue::Any which can handle more complex objects but it still looks to choke on the handle ...
      [tivadm@rtmr]/home/tivoli/robin$ ./new_ecg.pl Server ready. Waiting for connections on 8000 ... Can't store GLOB items at ../../lib/Storable.pm (autosplit into ../../ +lib/auto/Storable/_freeze.al) line 282, at /usr/opt/perl5/lib/site_pe +rl/5.8.0/Thread/Queue/Any.pm line 30 A thread exited while 12 other threads were still running.
      Any suggestions how I can pass the sockets to the thread pool ? Perhaps I can find another data structure that can hold them and then pass some form of pointer to that.

      Cheers,
      R.

      Pereant, qui ante nos nostra dixerunt!

        This is a known problem. The solution is to pass the fileno( $socket ) bwtween the threads. As this is just an integer, it bypasses the restrictions on shared variables. Once you get the fileno in the destination thread, you can re-open the socket there and use it in the normal ways.

        See Re: FileHandles and threads & Re^3: Passing globs between threads for some background and example code, plus some discussion of the caveats. The main problem with this approach is that you have to ensure that the socket returned by accept() does not go out of scope (causing the socket to be closed), before the thread has had chance to reopen it. That's easily achieved by pushing a copy of the socket handle into a (non-shared) array in the accept thread, as well as queueing it.

        However, you then have to arrange for those copies to be cleaned up so that you do not leak handles. The method I've evolved to handle that, involves saving copies of the handles in the accept thread as the values in a non-shared hash, keyed by the fileno. Once the client thread has finished with the thread, it closes it's handle and queues the fileno back to the accept thread which use it to close the original handle as a part of the acceppt loop.

        It sounds complicated, but is actually easier to code than describe. I'll clean up one of my test programs and post it later today.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://549457]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others chilling in the Monastery: (4)
As of 2024-04-19 06:11 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found