Beefy Boxes and Bandwidth Generously Provided by pair Networks
Problems? Is your data what you think it is?
 
PerlMonks  

Re^4: multithreaded tcp listener with IO::Socket

by Random_Walk (Parson)
on May 15, 2006 at 13:36 UTC ( #549496=note: print w/ replies, xml ) Need Help??


in reply to Re^3: multithreaded tcp listener with IO::Socket
in thread multithreaded tcp listener with IO::Socket

hi BrowserUk,

I tried using a thread pool but can not seem to pass the socket handle through the thread queue. Here is the error I get thrown by Thread::Queue. This happens as soon as the first client connects.

Updated, see below
C:\test>perl new_ecg.pl Server ready. Waiting for connections on 8000 ... Invalid value for shared scalar at C:/Perl/lib/Thread/Queue.pm line 90 +. A thread exited while 12 threads were running.
Here is the code I am using
#!/usr/bin/perl use strict; use warnings; use IO::Socket; use threads; use Thread::Queue; #get the port to bind to or default to 8000 my $port = $ARGV[0] || 8000; my $listeners = 10; # a hash to record client machines and thread queue for internal comun +ication my %clients; my $mqueue = Thread::Queue -> new; my $squeue = Thread::Queue -> new; threads->create ("monitor", $mqueue); for (1..$listeners) { threads->create ("read_data", $mqueue, $squeue)->detach; } #create the listen socket my $listen_socket = IO::Socket::INET->new(LocalPort => $port, Listen => 20, Proto => 'tcp', Reuse => 1); #make sure we are bound to the port die "Cant't create a listening socket: $@" unless $listen_socket; warn "Server ready. Waiting for connections on $port ... \n"; #wait for connections at the accept call while (my $connection = $listen_socket->accept) { # put the connection on the queue for a reader thread $squeue -> enqueue($connection); } sub read_data { # accept data from the socket and put it on the queue my ($mqueue, $squeue) = @_; while (my $socket = $squeue -> dequeue) { while (<$socket>) { print "listener got: $_"; $mqueue -> enqueue(time." $_"); } close $socket; } } sub monitor { my $mqueue = shift; while (1) { while ($mqueue -> pending) { my $data = $mqueue -> dequeue; print "monitor got: $data"; $data =~ /(\d+) Heartbeat from (\S+) next one in (\d+) min +utes/; my $time = $1; my $client = $2; my $cycle = $3; if (defined $clients{$client} and $clients{$client} -> [0] + eq 'NAK') { print "$client sent a beat again\n"; } $clients{$client} = [ 'OK', $time + $cycle * 60 ]; } for my $client (keys %clients) { next if $clients{$client}->[0] eq 'NAK'; next if $clients{$client}->[1] > time; print "$client missed a heartbeat, expected at $clients{$c +lient}->[1], now it is ".time."\n"; $clients{$client}->[0] = 'NAK'; } sleep 30; } }
Update
I had a search on that error and found Thread::Queue can only handle simple scalar values. I downloaded and installed Thread::Queue::Any which can handle more complex objects but it still looks to choke on the handle ...
[tivadm@rtmr]/home/tivoli/robin$ ./new_ecg.pl Server ready. Waiting for connections on 8000 ... Can't store GLOB items at ../../lib/Storable.pm (autosplit into ../../ +lib/auto/Storable/_freeze.al) line 282, at /usr/opt/perl5/lib/site_pe +rl/5.8.0/Thread/Queue/Any.pm line 30 A thread exited while 12 other threads were still running.
Any suggestions how I can pass the sockets to the thread pool ? Perhaps I can find another data structure that can hold them and then pass some form of pointer to that.

Cheers,
R.

Pereant, qui ante nos nostra dixerunt!


Comment on Re^4: multithreaded tcp listener with IO::Socket
Select or Download Code
Re^5: multithreaded tcp listener with IO::Socket
by BrowserUk (Pope) on May 15, 2006 at 16:59 UTC

    This is a known problem. The solution is to pass the fileno( $socket ) bwtween the threads. As this is just an integer, it bypasses the restrictions on shared variables. Once you get the fileno in the destination thread, you can re-open the socket there and use it in the normal ways.

    See Re: FileHandles and threads & Re^3: Passing globs between threads for some background and example code, plus some discussion of the caveats. The main problem with this approach is that you have to ensure that the socket returned by accept() does not go out of scope (causing the socket to be closed), before the thread has had chance to reopen it. That's easily achieved by pushing a copy of the socket handle into a (non-shared) array in the accept thread, as well as queueing it.

    However, you then have to arrange for those copies to be cleaned up so that you do not leak handles. The method I've evolved to handle that, involves saving copies of the handles in the accept thread as the values in a non-shared hash, keyed by the fileno. Once the client thread has finished with the thread, it closes it's handle and queues the fileno back to the accept thread which use it to close the original handle as a part of the acceppt loop.

    It sounds complicated, but is actually easier to code than describe. I'll clean up one of my test programs and post it later today.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Hi BrowserUk,

      I am still having a little trouble with my socket handles. I put the fileno an a queue and the treah picks it up OK. The problem comes down to this line in the handler thread
      open my $socket, '<&=', $fileno;

      The socket of course is opened read only but I need to write an 'OK' back to it when I have got the event. I tried
      open my $socket, '+<&=', $fileno;

      still no luck. Any idea how I can re-open it read/write ?

      Thanks a million for your help, there is a $favourite_drink waiting for you any time you are in Amsterdam,
      R.

      Pereant, qui ante nos nostra dixerunt!

        You need a dot ('.'), not a comma (',') (and don't forget some error handling :).

        open my $socket, '+<&=' . $fileno or die $!; ........................^

        For an explaination of the syntax and what it is doing, see perlopentut and the section entitled "Re-Opening Files (dups)".

        Sorry for not posting the code I promised. I've been trying to work it into a proper module. It works, but need substantial extra testing and documentation. I'll post it below as it is in it's current state.

        The usage will be something like this:

        my $server = threads::Server->new( LocalPort => 9000, Pool => 5, Accept => sub { printf "Accepted connection from %s:%d on port:%d\n", $_->peerhost, $_->peerport, $_->sockport; return; }, Thread => sub { my( $client, $Qout, @args ) = @_; while( <$client> ) { chomp; ##1 warnf "Got '%s'\n", $_; $Qout->enqueue( $_ ); print $client 'Ack'; } }, Common => sub { my $Q = shift; while( $Q->dequeue() ) { #1 warnf "Processing '$_'"; } }, ); $server->Run;

        That needs explaination, documentation, a lot of work, and relies on another of my own modules (a ripoff of theDamian's Smart::Comments), but it might provide some ideas for you.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://549496]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others exploiting the Monastery: (17)
As of 2014-09-23 17:07 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    How do you remember the number of days in each month?











    Results (232 votes), past polls