Beefy Boxes and Bandwidth Generously Provided by pair Networks
Think about Loose Coupling
 
PerlMonks  

Sharing sockets between the main script and thread

by markseger (Beadle)
on Nov 24, 2008 at 23:46 UTC ( #725730=perlquestion: print w/ replies, xml ) Need Help??
markseger has asked for the wisdom of the Perl Monks concerning the following question:

I have a thread that manages sockets by listening for connections and then opening/closing them as appropriate. I'm doing my actual communications in the main script by using the technique mentioned in several monk postings on how to share sockets, namely sharing the file number rather than the socket handle itself and doing an open in the main line using that number as in

open ($fd, ">&$fnum") or die;

$fd is now a descriptor that I can write to and it works like a champ. Sort of, and hence my question. I'm hoping I just have the open syntax wrong in which case the fix might be a simple one.

First I run the script and monitor the open sockets with netstat. Then I run a test script that simply connects to that socket and does a sysread, prints the results, closes the connection and exits.

Netstat shows the initial connection and then the socket goes into FIN_WAIT2 and hangs around for quite awhile or until I close my main script. On the other hand if I run the code non-threaded and do all the communications inline, I see the socket go to FIN_WAIT2 and then fairly quickly goes away.

My guess is that this is when the socket is closed by the test script, that close is detected and acknowledged by the thread where it was created, but copy of the socket that was created with the open() is not completing the communications handshake and I don't know enough about these mechanisms to know what to do.

Any help will be greatly appreciated.

btw - I also tried using the open technique in some non-threaded code and it too works correctly but leaves a connection in FIN_WAIT2 as well, so I'm guessing this has nothing to do with threads and more to do with having to close that second handle.

-mark

Comment on Sharing sockets between the main script and thread
Re: Sharing sockets between the main script and thread
by BrowserUk (Pope) on Nov 25, 2008 at 01:07 UTC

    Try it this way:

    #! perl -slw use strict; use IO::Socket; use threads qw[ yield ]; use threads::shared; use Thread::Queue; sub worker { my $Q = shift; while( my $fnum = $Q->dequeue ) { open my $client, "+<&$fnum" or die $!; while( <$client> ) { print $client $_; print $_; } close $client; } } our $WORKERS ||= 5; my $Q = new Thread::Queue; my @workers = map threads->create( \&worker, $Q ), 1 .. $WORKERS; my $server = IO::Socket::INET->new( LocalHost=>'localhost:54321', Listen=>5, Reuse=> 1 ) or die $^E; while( my $client = $server->accept ) { my $fno = fileno( $client ); $Q->enqueue( $fno ); yield while $Q->pending; ## wait until a worker dups the socket close $client; ## Now we can safely close it. }

    Rational: The socket won't be released until all open handles to are closed. You are currently getting one handle from the accept; creating a second within the thread (via the fileno). When the client goes away, the threads read loop ends and it's handle gets (explicitly or implicitly) closed. But, the handle in the main thread remains and the socket will not be finalised until it is closed, but there is nothing in the main thread that will do that. If you close the socket prior (or even immediately after!) queuing the fileno for the worker thread, by the time the worker gets it, there is no socket to dup.

    So, when accept returns, take the fileno, queue it, then yield your timeslices until the queue is empty. Now, a worker has dequeued the fileno and opened it's own handle to the socket, so it is safe to close the main threads copy. Everyone is happy :)


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Excellent! I couldn't quite do it the way you suggested for a couple of reasons, one being that I'm doing my accepts in the thread vs the main stream code so the logic would need to be reversed but more significant is the fact that the thread is stuck in a select->can_read(), waiting for connection requests and can't do that and watch a queue at the same time unless I wrap all that in an outer loop with a timeout and didn't want to do that.

      Nevertheless, since I am passing around the file numbers in a shared hash from the thread to the main code, I'm able to set it to a 'special' value when the socket is closed in the thread to tell the main line to close his end as well.

      In any event, the key to all this is it need to close the second descriptor and that did the trick. Seems to work like a champ, at least so far...

      -mark

Re: Sharing sockets between the main script and thread
by zentara (Archbishop) on Nov 25, 2008 at 13:32 UTC
    I'm not sure what your code looks like, but you might want to keep a shared clients array, and let the thread do it's own cleanup. See Simple threaded chat server
    #close filehandle before detached thread dies out close( $lclient); #remove multi-echo-clients from echo list @clients = grep {$_ !~ $lfileno} @clients;

    I'm not really a human, but I play one on earth Remember How Lucky You Are
      Looks like I finally got this working, sort of, and so am posting what I have. However, I significantly cut back my testing code and it has a very odd quirk in it. There's a print statement at the very top of the loop in the server that simply print 'top'. If I comment it out, the script hangs after the first connection - it eventually wakes up but things get very ugly. If I remove the comment it works fine. Also, if while hanging I do a 'netstat -a', it also wakes up. very bizarre

      That said, here's the server code. The way it works is it receives a connection from one or more clients and then starts printing the value of $count to them. There's also a sleep statement at the bottom of the main loop which you can uncomment to speed up the amount of messages sent to the client. You can start/stop one or more clients while this is running and other than that problem with the print statement it seems very solid to me. You can also stop/start the server and the clients will reconnect when the socket becomes available.

      Also note I have a bunch of logging messages that helped me coordinate problems between the client and the server. Easy enough to turn off.

      SO here's the main server code with LOTS of error handling...

      #!/usr/bin/perl -w use Time::HiRes; use IO::Socket; use IO::Select; use threads; use threads::shared; use Thread::Queue; $SIG{"INT"}=\&sigInt; # for ^C $SIG{"PIPE"}=\&sigPipe; # socket comm errors my %sockConns; share(%sockConns); $q1 = new Thread::Queue; $q2 = new Thread::Queue; my $thread=threads->create('manageSock', $q1, $q2)->detach; my $done=0; my $count=0; while(!$done) { print "top\n"; $count++; lock(%sockConns); foreach my $fn (keys %sockConns) { logit("FN: $fn=$sockConns{$fn}"); if ($sockConns{$fn}==-1) { logit(">>>Close 1st: $fn"); $sockOpened{$fn}->close() if defined($sockOpened{$fn}); delete $sockOpened{$fn}; delete $sockConns{$fn}; $q1->enqueue($fn); my $wait=$q2->dequeue; logit("Continue..."); last; } if (!defined($sockOpened{$fn}) && !open($sockOpened{$fn}, ">&$fn") +) { print "Couldn't open socket $fn for writing\n"; next; } logit("Write: $count TO: $fn"); $bytes=syswrite($sockOpened{$fn}, "$count/n", length($count)+1, 0) +; # Do nothing as socket will disconnet and normal cleanup will kick + in if (!$bytes) { logit("========================> Comm Failure <================= +==="); last; } logit("Wrote $bytes bytes"); } sleep 1; # uncomment to slow responses down # print "awake\n"; } sub manageSock { my $q1=shift; my $q2=shift; $port=2655; my $sockServer = new IO::Socket::INET( Type=>SOCK_STREAM, Reuse=>1, Listen => 1, LocalPort => $port) || error("Could not create local socket on port $port"); logit("Server socket opened on port $port"); my $select=new IO::Select($sockServer); while(1) { logit("Waiting on socket"); while (my @ready=$select->can_read) { my $saveFnum; my $saveHandle; my $waitForClose=0; foreach my $filehandle (@ready) { lock(%sockConns); logit("Socket 'can read'"); if ($filehandle==$sockServer) { my $new=$sockServer->accept() || logmsg('E', "Couldn't accep +t connection request"); $select->add($new); my $fnum=$new->fileno(); $sockConns{$fnum}=0; $sockNumConn++; logit("Connection on FN: $fnum"); } else { my $message=<$filehandle>; my $fnum=$filehandle->fileno(); if (!defined($message)) { logit("Client Disconnect FN: $fnum"); $saveFnum=$fnum; $saveHandle=$filehandle; $waitForClose=1; $sockConns{$fnum}=-1; last; } else { logit("Ignoring: $message"); } } } if ($waitForClose) { logit("Waiting for 1st socket close"); my $fnum=$q1->dequeue; $select->remove($saveHandle); $saveHandle->close(); $sockNumConn--; $q2->enqueue($fnum); # tell main process OK to release lock } } } } sub sigPipe { #trap but ignore } sub sigInt { print "^C\n"; $done=1; } sub logit { my $text=shift; my ($intSeconds, $intUsecs)=Time::HiRes::gettimeofday(); $time=sprintf("$intSeconds.%06d", $intUsecs); print "$time $text\n"; }
      And here's the client I test it with. To run it the first argument is the address for the server - I've been doing all my testing with both client/server on the same system. If you specify a second argument, the client will read a response, sleep for a second and read another, looping until you ^C and it will exit cleanly so you can restart it. As I said you can run multiple instances, starting/stopping them and they do the right thing. Finally, if you give it a 3rd argument it will skip the sleep 1 and connect/disconnect as fast as possible.
      #!/usr/bin/perl -w use IO::Socket; use IO::Select; use Time::HiRes; if (!defined($ARGV[0])) { print "usage: client.pl address[:port] continuous nosleep\n"; exit; } ($address,$port)=split(/:/, $ARGV[0]); $port=2655 if !defined($port); $contFlag= defined($ARGV[1]) ? 1 : 0; $sleepFlag=defined($ARGV[2]) ? 0 : 1; $SIG{"INT"}=\&sigInt; # for ^C select STDOUT; $|=1; while(1) { logit("OPEN"); $socket=new IO::Socket::INET( PeerAddr => $address, PeerPort => $port, Proto => 'tcp', Timeout =>1); if (!defined($socket)) { logit("Couldn't connect to server, retrying"); sleep 1; next; } $select = new IO::Select($socket); logit("Try to read"); while (my @ready=$select->can_read()) { logit("Can_read"); $bytes=sysread($socket, $line, 100); if ($bytes==0) { logit("Socket closed on other end"); $socket=''; last; } @handles=($select->can_read(0)); last if scalar(@handles)==0; } chomp $line; logit($line); logit("client close"); $socket->close if $socket ne ''; $select->remove($socket); last if !$contFlag; sleep 1 if $sleepFlag; } sub sigInt { print "Close Socket\n"; $socket->close(); exit; } sub logit { my $text=shift; my ($intSeconds, $intUsecs)=Time::HiRes::gettimeofday(); $time=sprintf("$intSeconds.%06d", $intUsecs); print "$time $text\n"; }
      If anyone has any clue why it only works correctly with that "print top" statement I've love to hear an answer. In the case of my collectl script I don't have this problem, but there is also a lot of other activity going on the main processing loop so perhaps that's why. I also suspect my scripts could be somewhat more compresses but I guess I've always been in the habit of being more verbose so both myself and others could better understand what I'm doing... -mark
        If anyone has any clue why it only works correctly with that "print top" statement

        Quite frankly, if your code does indeed work correctly with that "print top" statement, then I'd just leave that statement there. Then, I'd contact the Magic Circle and show it to them because they'd probably pay you a big lump of cash once they work out how it "works", because they'll be able to use for as the basis of some damn good illusions.

        If you add use strict: to the top of your program and fix all these errors:

        C:\test>perl -c junk8.pl Global symbol "$q1" requires explicit package name at junk8.pl line 16 +. Global symbol "$q2" requires explicit package name at junk8.pl line 17 +. Global symbol "$q1" requires explicit package name at junk8.pl line 18 +. Global symbol "$q2" requires explicit package name at junk8.pl line 18 +. Global symbol "%sockOpened" requires explicit package name at junk8.pl + line 31. Global symbol "%sockOpened" requires explicit package name at junk8.pl + line 31. Global symbol "%sockOpened" requires explicit package name at junk8.pl + line 32. Global symbol "$q1" requires explicit package name at junk8.pl line 34 +. Global symbol "$q2" requires explicit package name at junk8.pl line 35 +. Global symbol "%sockOpened" requires explicit package name at junk8.pl + line 40. Global symbol "%sockOpened" requires explicit package name at junk8.pl + line 40. Global symbol "$bytes" requires explicit package name at junk8.pl line + 46. Global symbol "%sockOpened" requires explicit package name at junk8.pl + line 46. Global symbol "$bytes" requires explicit package name at junk8.pl line + 49. Global symbol "$bytes" requires explicit package name at junk8.pl line + 53. Global symbol "$port" requires explicit package name at junk8.pl line +63. Global symbol "$port" requires explicit package name at junk8.pl line +68. Global symbol "$port" requires explicit package name at junk8.pl line +68. Global symbol "$port" requires explicit package name at junk8.pl line +69. Global symbol "$sockNumConn" requires explicit package name at junk8.p +l line 87. Global symbol "$sockNumConn" requires explicit package name at junk8.p +l line 114. Global symbol "$time" requires explicit package name at junk8.pl line +133. Global symbol "$time" requires explicit package name at junk8.pl line +134. junk8.pl had compilation errors.

        You might get close to understanding some of your problems. The entire logic of your code is dependant upon this hash %sockOpened. You use it to direct the flow of your program all over the place:

        $sockOpened{$fn}->close() if defined($sockOpened{$fn}); ... if (!defined($sockOpened{$fn}) && !open($sockOpened{$fn}, ">&$ +fn")) { ... $bytes = syswrite($sockOpened{$fn}, "$count/n", length($count) ++1, 0);

        But you never declare that variable, and you never write to it, so what is it that you are testing?

        I also suspect my scripts could be somewhat more compresses but I guess I've always been in the habit of being more verbose so both myself and others could better understand what I'm doing...

        Sorry, but it doesn't seem to be working for you.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://725730]
Approved by planetscape
Front-paged by grinder
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others taking refuge in the Monastery: (4)
As of 2014-10-26 07:07 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    For retirement, I am banking on:










    Results (152 votes), past polls