Beefy Boxes and Bandwidth Generously Provided by pair Networks
Don't ask to ask, just ask
 
PerlMonks  

Problem handling 2 simultaneous socket streams

by PhillyR (Acolyte)
on Sep 15, 2011 at 14:45 UTC ( #926158=perlquestion: print w/ replies, xml ) Need Help??
PhillyR has asked for the wisdom of the Perl Monks concerning the following question:

I've created a proxy script to receive 2 streams of data and forward to multiple clients (each client picks which stream they want). I'm having a problem (most likely some blocking) when both streams are active at the same time. My proxy listens for connections from the server sending stream1. My proxy initiates a connection to a second server for stream2 when a client asks for stream2. Confusing? Excellent. Onto the code:
#!/usr/bin/perl -w use IO::Socket; use IO::Select; $stream1_prt = 5555; $stream2_prt = 5556; $client1_prt = 1100; $client2_prt = 1101; $my_ip = '192.168.1.12'; # Create Stream 1 listen $stream1_lsn = new IO::Socket::INET (LocalAddr => $my_ip, LocalPort => $stream1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Stream 1 listen socket couldn't be create: $@\n"; # Create Client 1 listen $client1_lsn = new IO::Socket::INET (LocalAddr => $my_ip, LocalPort => $client1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 1 listen socket couldn't be create: $@\n"; # Create Client 2 listen $client2_lsn = new IO::Socket::INET (LocalAddr => $my_ip, LocalPort => $client2_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 2 listen socket couldn't be create: $@\n"; # Add listen sockets to select $sockets = new IO::Select(); $sockets->add($stream1_lsn); $sockets->add($client1_lsn); $sockets->add($client2_lsn); #Go into infinite loop, handling connections - how do I end this? while(@ready = $sockets->can_read) { #got data foreach $socket (@ready) { # find which socket sent data $sck_prt = $socket->sockport; $pr_prt = $socket->peerport; #used by sockets sending data #New Stream1 if ($socket == $stream1_lsn) { $stream1_sock = $stream1_lsn->accept(); #accept stream 1 $sockets->add($stream1_sock); #add socket to select $stream1_hst = $stream1_sock->peerhost; print "New stream1 from IP [$stream1_hst]\n"; } #New client1 elsif ($socket == $client1_lsn) { $client1_sock = $client1_lsn->accept(); #accept client 1 $sockets->add($client1_sock); #add socket to select $client1_hst = $client1_sock->peerhost; print "New client for stream 1 @ IP [$client1_hst]\n"; } #New client2 elsif ($socket == $client2_lsn) { $client2_sock = $client2_lsn->accept(); #accept client 2 $sockets->add($client2_sock); #add socket to select $client2_hst = $client2_sock->peerhost; print "New client for stream 2 @ IP [$client2_hst]\n"; #Connection to second server running locally $stream2_sock = new IO::Socket::INET( PeerAddr => 'localhost', PeerPort => $stream2_prt, Type => SOCK_STREAM, Proto => "tcp") or die "Stream2 socket couldn't be created: $@\n"; $sockets->add($stream2_sock); #add socket to select } #Stream2 data to all client2's elsif(defined($stream2_sock)) { #may not be initiated yet if($socket == $stream2_sock) { # NEXT LINE IS BLOCKING --------------------------- if(defined($buf1=<$socket>)) { #put stream2 into buf1 @writers1 = $sockets->can_write; foreach writer1 (@writers1) { $wrt_sp1 = $writer1->sockport; if ($wrt_sp1 == $client2_prt) { print $writer1 $buf1; #send stream2 to each client2 } } } } #Stream1 data to all Client1's elsif(defined($pr_prt)) { #used this port due to multiple $stream1_sock connections #THIS LINE DOESN'T EXCUTE IF STREAM2 IS ACTIVE ----------- if(defined($buf=<$socket>)) { #put stream1 into buffer @writers = $sockets->can_write; foreach $writer (@writers) { $wrt_sp = $writer->sockport; if($sck_prt == $stream1_prt && $wrt_sp == $client1_prt) { print $writer $buf; #send stream1 to all client1's } } } } else { print "Closing socket\n"; #Needed if clients close connection $sockets->remove($socket); close($socket); } } }
When I connect a client1 to the proxy it connects and waits for stream1. When stream1 server initates the connection, stream1 starts flowing to client1. Later when client2 connects, the proxy initiates the connection to the second server and stream2 starts. Stream2 goes to client2 but stream1 stops. I need stream1 to continue to flow. What am I doing wrong?

Comment on Problem handling 2 simultaneous socket streams
Download Code
Re: Problem handling 2 simultaneous socket streams
by BrowserUk (Pope) on Sep 15, 2011 at 14:55 UTC
    # NEXT LINE IS BLOCKING --------------------------- if(defined($buf1=<$socket>)) { #put stream2 into buf1

    Using buffered IO with non-blocking sockets is a no-no. readline() (<>) won't return until it sees the current delimiter (typically "\n"); if the other end sends data without a delimiter, it will block; if the other end never gets around to sending a delimiter, then it will block forever.

    The solution is to use read, sysread or recv, but then you'll need to buffer and track the delimiters yourself, which gets awfully messy.

    The whole thing would be far simpler using threads.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Good advice.   (Fav'd.)

      A thread can indeed be assigned to do nothing more than sit there, parked on a blocking-read, and maybe posting the received lines to a common queue for processing if the processing that needs to be done happens to favor some serialized approach.   The threads basically will consume almost-no resources other than a process-table entry somewhere in the guts of the operating system, and they permit you to use the “simple calls” to entirely avoid logical complexity.   They are also in the perfect position to recognize when a connection dies, and to record statistics about the connection.

        The threads basically will consume almost-no resources other than a process-table entry somewhere in the guts of the operating system

        Oh. Thank you for explaining that to me. Except you're wrong!

        Threads are not processes, so therefore do not have "process-table entries".

        On the other hand, threads have their own stacks; their own interpreter contexts; their own copies of the environment; their own copies of pre-existing global context; their own proxies for any shared data; their own ...

        In other words, you aren't just slightly off, but absolutely diametrically wrong on all counts.

        So why post? Why do you -- who evidently know little of the subject, and understand even less of the little you have some inkling of -- feel the need to demonstrate to me -- I think fair to say, one of the more knowledgeable monks with regard to threading -- just how useless your home-spun wisdom fairy stories on this subject, as with so many others, really are?

        Are you hoping to help me? Or help the OP? Or to ingratiate yourself with me in particular or the monks in general?

        What is the point of your posting this garbage?

        Oh look, someone front paged this thread. I wonder if that could have influenced your need to "say something"?


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: Problem handling 2 simultaneous socket streams
by ikegami (Pope) on Sep 15, 2011 at 14:55 UTC

    I started writing a mini-tutorial explaining why readline (aka <$fh>) interacts badly with select.

    On that same line, your use of can_write is broken. 1) It can also block. 2) It might not return all sockets, in which case you'll suffer data loss. I can't go into details right now, so I'll push you in the right direction: You need to use an output buffer for each writer socket and use IO::Select::select($readers, $writers, undef) where you currently use can_read.

    It's much easier to do using threads.

Re: Problem handling 2 simultaneous socket streams
by zentara (Archbishop) on Sep 15, 2011 at 15:26 UTC
    You might get some ideas from testing Simple threaded chat server. It has a simple method for making the chat echo to all clients, and you can probably modify it for your purposes manipulating the @clients list, or splitting @clients into 2, @clients1 and @clients2. It's not bulletproof code, but does demonstrate the flow thru threads.

    I'm not really a human, but I play one on earth.
    Old Perl Programmer Haiku ................... flash japh
      Thanks for the advice. I switched over to using a combination of select and threads for managing the separate incoming streams and it is working properly.
      How would I go about adding in reading STDIN to allow a user to type quit to end this program? I have unsuccessfully tried creating a new thread (prior to the while loop below) that monitored STDIN. Once again I was blocked.
      #!/usr/bin/perl -w # ----------------------------- use warnings; use strict; use IO::Socket; use IO::Select; use threads; use threads::shared; # ----------------------------- # ----------------------------- our @stream1_clients : shared; our @stream2_clients : shared; my $stream1_prt = 5555; my $stream2_prt = 5556; my $client1_prt = 1100; my $client2_prt = 1101; my $ip = '192.168.1.12'; # ----------------------------- # Create Stream 1 listen ------ my $stream1_lsn = new IO::Socket::INET (LocalAddr => $ip, LocalPort => $stream1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Listen => 5) or die "Stream 1 listen socket couldn't be created: $@\n"; # ----------------------------- # Create Client 1 listen ------ my $client1_lsn = new IO::Socket::INET (LocalAddr => $ip, LocalPort => $client1_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 1 listen socket couldn't be create: $@\n"; # ----------------------------- # Create Client 2 listen ------ my $client2_lsn = new IO::Socket::INET (LocalAddr => $ip, LocalPort => $client2_prt, Type => SOCK_STREAM, Proto => "tcp", Reuse => 1, Blocking => 0, #part of issue? Listen => 5) or die "Client 2 listen socket couldn't be create: $@\n"; # ----------------------------- # Add listen sockets to select my $sockets = new IO::Select(); $sockets->add($stream1_lsn); $sockets->add($client1_lsn); $sockets->add($client2_lsn); # ----------------------------- #<< SPLIT OFF A SEPARATE THREAD FOR STDIN?? #Go into infinite loop, handling connections - how do I end this? while(my @ready = $sockets->can_read) { #got data foreach my $socket (@ready) { # find which socket sent data #New Stream1--------------------- if ($socket == $stream1_lsn) { my $stream1_sock = $stream1_lsn->accept(); #accept stream 1 $stream1_hst = $stream1_sock->peerhost; print "New stream1 from IP [$stream1_hst]\n"; my $thr_stream1 = threads->new(\&stream1, $stream1_sock)->detach +(); } # ----------------------------- #New client1 ------------------ elsif ($socket == $client1_lsn) { my $client1_sock = $client1_lsn->accept(); #accept client 1 $sockets->add($client1_sock); #add socket to select my $fileno = fileno $client1_sock; push (@stream1_clients, $fileno); $client1_hst = $client1_sock->peerhost; print "New client for stream 1 @ IP [$client1_hst]\n"; } # ----------------------------- #New client2 ------------------ elsif ($socket == $client2_lsn) { my $client2_sock = $client2_lsn->accept(); #accept client 2 $sockets->add($client2_sock); #add socket to select my $fileno2 = fileno $client2_sock; push(@stream2_clients, $fileno2); $client2_hst = $client2_sock->peerhost; print "New client for stream 2 @ IP [$client2_hst]\n"; #Connection to second server running locally my $stream2_sock = new IO::Socket::INET( PeerAddr => 'localhost', PeerPort => $stream2_prt, Type => SOCK_STREAM, Proto => "tcp") or die "Stream2 socket couldn't be created: $@\n"; my $thr_stream2 = threads->new(\&stream2, $stream2_sock)->detach +(); } # ----------------------------- } } # Routine called by thread to send stream 1 sub stream1 { my ($lclient1) = @_; my $buf1; if ($lclient1->connected) { while(defined($buf1=<$lclient1>) { foreach $fn1 (@stream1_clients) { open my $fh1, ">&=fn1" or warn $! and die; binmode($fh1, ":raw"); print $fh1 $buf1; } } } close ($lclient1); } # ----------------------------- # Routine called by thread to send stream 2 sub stream2 { my ($lclient2) = @_; my $buf2; if ($lclient2->connected) { while(defined($buf2=<$lclient2>) { foreach $fn2 (@stream2_clients) { open my $fh2, ">&=fn2" or warn $! and die; binmode($fh2, ":raw"); print $fh2 $buf2; } } } close ($lclient2); } # ----------------------------- # sub STDIN_stream ??
        How would I go about adding in reading STDIN to allow a user to type quit to end this program?

        See Re: Ask for STDIN but don't pause for it and the last example. This is untested in your code, but something like this will work. Just add \*STDIN to your Select object and test for it.

        # Add listen sockets to select my $sockets = new IO::Select(); $sockets->add(\*STDIN ); $sockets->add($stream1_lsn); $sockets->add($client1_lsn); $sockets->add($client2_lsn); ..... #Go into infinite loop, handling connections - how do I end this? while(my @ready = $sockets->can_read) { #got data foreach my $socket (@ready) { # find which socket sent data if ($socket == \*STDIN) { #read stdin and detect a q; exit; }

        I'm not really a human, but I play one on earth.
        Old Perl Programmer Haiku ................... flash japh
Re: Problem handling 2 simultaneous socket streams
by osbosb (Monk) on Sep 15, 2011 at 17:48 UTC
    Don't forget to:
    use strict;
    in addition to your warnings switch
      Good catch. Absolutely use use strict;. Even more important than use warnings; or -w.
Re: Problem handling 2 simultaneous socket streams
by salva (Monsignor) on Sep 16, 2011 at 06:40 UTC
Reaped: Re: Problem handling 2 simultaneous socket streams
by NodeReaper (Curate) on Sep 18, 2011 at 13:59 UTC
Reaped: Re: Problem handling 2 simultaneous socket streams
by NodeReaper (Curate) on Sep 19, 2011 at 13:54 UTC

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://926158]
Approved by rovf
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others chilling in the Monastery: (5)
As of 2014-07-30 11:07 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    My favorite superfluous repetitious redundant duplicative phrase is:









    Results (230 votes), past polls