Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl: the Markov chain saw
 
PerlMonks  

share socket connections amongst multiple threads?

by Elijah (Hermit)
on Jul 08, 2005 at 18:11 UTC ( [id://473523]=perlquestion: print w/replies, xml ) Need Help??

Elijah has asked for the wisdom of the Perl Monks concerning the following question:

I have ran into an issue while creating a hash of socket id's and trying to share them across multiple threads. I get the error "Invalid value for shared scalar" due to the fact that the usage I have is trying to share an object reference (the socket id ref). This of course is not allowed but I can not think of an alternative to make this work.
#!/usr/bin/perl -w use strict; use threads; use threads::shared; use IO::Socket::INET; my $sock = new IO::Socket::INET ( LocalHost => 'localhost', LocalPort => '12345', Proto => 'tcp', Listen => 10, Blocking => 1, ); die "Could not create socket: $!\n" unless $sock; my($thread, $cnt); my %usersocks : shared; while (my $UserSock = $sock->accept()) { $cnt++; $thread = threads->create(\&connection, $UserSock, $cnt); } $_->join for threads->list; sub connection { my ($sockID, $cnt) = @_; my $sockData; lock(%usersocks); $usersocks{$cnt} = $sockID; while($sockID->recv($sockData, 1024)) { print $sockData; #debug code to see contents of hash while(my($key, $value) = each(%usersocks)) { print "$key => $value\n"; } $usersocks{$_}->send($sockData) for sort(keys %usersocks); } close($sockID); delete $usersocks{$sockID}; }

The line $usersocks{$cnt} = $sockID; is of course my issue because $sockID is the obj reference to the connection id. Is there a way of accomplishing this? I ultimately want all data sent to this server application to be sent out to all threaded clients.

Replies are listed 'Best First'.
Re: share socket connections amongst multiple threads?
by waswas-fng (Curate) on Jul 08, 2005 at 18:34 UTC
    Why do you need to share the IO::Socket::INET connection object? I dont get it. If you want to send data to all of the threads you dont need the object, just a shared var or two that you can use as a queue. Am I missing what you are trying to do here?


    -Waswas
      Am I missing what you are trying to do here?

      I believe so. Each thread is a spawned socket connection between the server and client that is connecting. When each threaded socket gets sent data on it's respective connection it needs to re-broadcast this data to all the other socket connections. I am not trying to share simple text or integer data between the threads, I am trying to make each socket connection aware of the rest.

        Your explanation of what you are trying to do is still lacking.

        Just for a moment, lets assume that you can arrange for multiple threads to have a copy of the same socket. At this point, you have 1 client writing to 1 socket and multiple server threads wanting to read from it. The first thread that reads the socket will get the data, and all the others won't.

        You mention "re-broadcasting". How?

        Sockets are point to point. In order for any one server thread to be able to re-broadcast data to every other server thread, it would need to have a separate socket connection for each of those other servers and re-transmit the data to all of them.

        For 2 server threads, you would need 2 sockets. 1 to the client and 1 between the threads.

        For 3 server threads, you would need 3 sockets + the client: s1<->s2; s1<->s3; s2<->s3; + s(n)<->C;

        For 4 server threads, you would need 6 sockets + the client: s1<->s2; s1<->s3; s1<->s4; s2<->s3; s2<->s4; s3<->s4;

        For 5 server threads, you would need 10 sockets + the client: ...

        For 6 server threads, you would need 15 ... I think you can see where this is going.

        And, remember we are just assuming that you could successfully share the socket to the client between multiple threads, which you cannot. You would also have to arrange for each thread to "monitor" all of these sockets waiting for input.

        Put succinctly, what you are trying to do is not a "limitation of threads", but a limitation of your understanding. A bad design that could never work.

        In order for us to suggest solutions, you will need to explain what you are trying to achieve, rather than how you are trying to achieve it.

        If the idea is that all threads will have access to data read from a client, then reading that data from the socket on one thread only and then placing it into shared memory such that all clients have access to it is possible--but there is a problem.

      • When will you know that the data has been seen by all the threads that need access to it? That is to say, when will you know that any given piece of data is finished with?

        If you do not have some mechanism for deciding when a piece of data will be discarded, then all inbound data will simply accumulate in memory and you have a memory problem.

        The classic way of dealing with this problem for some applications is a shared queue. This works well for producer-consumer type problems where each piece of data placed on the queue by a producer is consumed by only one consumer.

        But, from the little you have said, I think you are more likely talking about something like an IRC server or MUD server. For this type of application you are better off using one queue per client thread, and a central dispatcher or controller thread with its own queue, plus a listener/client thread factory which may or may not need its own queue depending upon the details of the application.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        Lingua non convalesco, consenesco et abolesco. -- Rule 1 has a caveat! -- Who broke the cabal?
        "Science is about questioning the status quo. Questioning authority".
        The "good enough" maybe good enough for the now, and perfection maybe unobtainable, but that should not preclude us from striving for perfection, when time, circumstance or desire allow.
        That I understand, what I don't understand is what that process has to do with the actual socket. It seems like the model should be: start each connection on its own thread -- that thread is in control of the connection. If any data is recived by any thread then assign that to a "queue" (shared var or vars) and then have each of the other threads that are masters of their connection's read off that queue and push out the data on their connection. I just dont get what bennifet sharing the connection object has in that view of things.


        -Waswas
Re: share socket connections amongst multiple threads?
by zentara (Archbishop) on Jul 08, 2005 at 21:11 UTC
    Awhile back an anonymous monk figured out a threaded-chat-echo-server at thread:shared. To save you time, here is the final code we came up with: (I 've included a forked client at the end too)
    #!/usr/bin/perl # buffered Chat Server (build on the threaded chat server of Patric Ha +ller) # that does NOT echo the lines back to the # Client where they are from. Therefore: $ID which is used in the loop + # of Reader and the Writer. # Furthermore it doesn't do anything if there is only 1 Client. use strict; use warnings; use threads; # pull in threading routines use threads::shared; # and variable sharing routines use IO::Socket::INET; # and rock the sock et use File::Temp qw/ :POSIX /; #t: buffered, threaded Chat Server, options -cev: our $p2Cons = ( "@ARGV" =~ /^\-[ev]*c/i ) ? 1 : 0; #: (-c) print + incomminge lines to console (used in Reader), default: no print to c +onsole our $e2Client = ( "@ARGV" =~ /^\-[cv]*e/i ) ? 1 : 0; #: (-e) echo inco +mming lines back to Client (used Reader & Writer), default: no echo our $verbose = ( "@ARGV" =~ /^\-[ec]*v/i ) ? 1 : 0; #: (-v) verbose mo +de, default: not verbose, (for logging use: >> File.log) if ( "@ARGV" =~ /\?|h/) { print "\nuse of this program:\n\t$0 [-celv]". "\n\t-c: print incomming lines to console,". "\n\t-e: echo incomming lines back to the clie +nt,". "\n\t-v: verbose mode\n\n"; exit; } # internal Variables our @chat:shared = (); # buffer for incomming lines # $Elm is used as hash, but the single Items cannot be protected by lo +ck # Therefore I choosed a string our $Elm:shared = ''; # our $NoClient:shared = 0; # No. of Clients connected our $EOL = "\r\n"; # signal for the Writer to die, # otherwise $NoClient and $Elm would be hard do admin. tmpnam() =~ /.+mp\/(.+)/; our $kill = $1.' my Socket broke '.$1; # to have a secure kill-code +(no matter how it looks like) $SIG{PIPE} = 'ignore'; sub ignore { ; } #We ignore SIGPIPEs generated by clients trying to work #with closed connections. SIGPIPEs, if not handled, cause deat +h. my $server = IO::Socket::INET->new(LocalPort => 3333, Type => SOCK +_STREAM, Reuse => 1, Listen => 10) or die $!; while (my $client = $server->accept()){ #forea +ch $client my $pAddr = $client->peerhost(); if ( $pAddr !~ /^127\.0\.0\./ && $pAddr !~ /^10\.10\.1 +0\.\d+/ && $pAddr !~ /^192\.168\.0\.\d+/ ) { print $client 'Sorry not for you..',$EOL; print "Ooops, who was that? Addr: $pAddr\n"; close($client); next; } lock($NoClient); $NoClient++; cond_broadcast($NoClient); tmpnam() =~ /.+mp\/(.+)/; # get a secure + ID lock($Elm); + # add this ID to $Elm $Elm .= "$1:0;"; cond_broadcast($Elm); my $r = threads->new(\&Reader::run, client => $client, "ID", +"$1", "Addr", $pAddr ); $r->detach(); my $w = threads->new( \&Writer::run, client => $client, "ID", +"$1" ); $w->detach(); } ##### package Reader; #: detached process to receive the Client's Input use threads; use threads::shared; sub new { #: create Reader my $pkg = shift; #: Package my $self = { @_ }; return bless($self, $pkg); ##: arr of blessed (self, pkg) } sub run { #: runs until the socket of this Reader dies; reads from the + socket and pushs it into @chat my $self = Reader->new(@_); #: Me my $socket = $self->{client}; #: The socket of the Client my $ID = $self->{ID}; #: The ID (same as for the Writer) my $Time = time; printf "$ID\t%12s has connected at %s\n",$self->{Addr}, scalar +(localtime($Time)); my $l; while(defined ($l = <$socket>) ){ # only 1 Client don't echo! print "$ID <\t$l" if $verbose; next if ($NoClient < 2 && (!$e2Client) ); # skip empty lines: this may not work for everyone $l =~ /(.+)[\n\r]+/; if ($1) { lock(@chat); # add ID, so that the Writer knows what NOT to + send => NO echo! push @chat, "$ID\t$1"; cond_broadcast(@chat); } print "$ID\t$1\n" if ($p2Cons); } # end while printf "$ID\t%12s disconnected at %s after %s\n",$self->{Add +r}, scalar(localtime(time)), s2T(time-$Time); print "Reader $ID\n\tI'm going to die, bye ..\n" if $verbose; lock($NoClient); lock($Elm); $l = ''; # used here as tmp foreach ( split /;/, $Elm ) { $l .= "$_;" if ( $_ !~ /^$ID\:/ && $_ =~/:/); } $Elm = $l; print "\tnew Client indexes:$Elm\n" if $verbose; $NoClient--; cond_broadcast($NoClient); cond_broadcast($Elm); lock(@chat); cond_broadcast(@chat); ##: nothing } sub s2T { #: calcs sec into days hh:mm:ss # my $dur = shift #: Duration in sec. transfrmd into D +ays hh:mm:ss ##: formated string [d] hh:mm:ss if ( $_[0] > 86400 ) { my $ti = ( ($_[0]%86400)/3600 )%100; my $t = ($_[0]%86400) - ($ti*3600); return sprintf(" %i d %3i:%02i:%02i", int($_[0] / 86400),$ti +,(($t/60)%60),($t-((($t/60)%60)*60))); } my $ti = ( $_[0]/3600 )%100; my $t = $_[0] - ($ti*3600); return sprintf("%3i:%02i:%02i",$ti,(($t/60)%60),($t-(( +($t/60)%60)*60))); #: return fotmatted transcripted duration } ##### package Writer; #: detached process to print to the socket for the cli +ent use threads; use threads::shared; sub new { #: create a Writer my $pkg = shift; #: Package my $self = { @_ }; #: Me return bless($self, $pkg); ##: arr of blessed (self, pkg) } sub run { #: runs until it gets the code to die from the Reader my $self = Writer->new(@_); #: Me my $socket = $self->{client}; #: Socket to the Client my $ID = $self->{ID}; #: The Writer/Readers ID my (%E, $min); while( 1 ) { lock(@chat); cond_wait(@chat); # shall I die? last if ( $Elm !~ /$ID:/ ); next unless (@chat); lock($Elm); %E = (map { _split($_) } (split /;/, $Elm)); print "Writer $ID\n\tsends ",(($e2Client) ? ' ' : 'up +to '),(@chat - $E{$ID})," lines from ",(scalar @chat)," of \@chat\n" +if $verbose; foreach ( @chat[$E{$ID} .. $#chat] ) { # all before $ +E{$ID} has been send by me /(.+?)\s(.+)[\n\r]*/; # split into $ID and or +g. line # and send only the line ONLY if it is not fro +m 'my' Reader and it is not the kill-code if ($e2Client) { print $socket $2,$EOL if ( $2 ne $kill +); } else { print $socket $2,$EOL if ( $1 ne $ID & +& $2 ne $kill); } } # now rewrite $Elm and Chat $E{$ID} = @chat; $min = min(values %E); # print "deleting form chat-buffer:\n",(map { $_ +."\n" } @chat[0 .. ($min-1)]),"\n" if $verbose; print "\tdelets from \@chat $min lines\n" if $verbose; @chat = @chat[$min .. $#chat]; # to eliminate all bef +ore $min and keep the rest $Elm =''; + # to rewrite $E foreach ( keys %E ) { $Elm .= "$_:".(($_ eq $ID) ? @chat : ($E{$_} - + $min) ).';'; } print "\tnew \@chat, size: ",scalar @chat,";\n\tClient + indexes:$Elm\n" if $verbose; cond_broadcast($Elm); } # end while print "Writer $ID\n\tdies too, ..\n" if $verbose; ##: nothing } sub min { #: min of value-list # @_ = #: LIST of values (int,float) my $m = shift; foreach (@_) { $m = $_ if $m > $_ } return $m; ##: min of list } sub _split { #: internal use to split a string 'key:item' into + key and item for a hash # my $_[0] #: String to be splitted at ':' /(.+):(.+)/; return ($1) ? ($1 => $2) : (); ##: pair Key => Item o +r an empty list } __END__ ## a test client##################### #!/usr/bin/perl -w use strict; use IO::Socket; my ( $host, $port, $kidpid, $handle, $line ); ( $host, $port ) = ('192.168.0.1',3333); #my $name = shift || ''; #if($name eq ''){print "What's your name?\n"} #chomp ($name = <>); # create a tcp connection to the specified host and port $handle = IO::Socket::INET->new( Proto => "tcp", PeerAddr => $host, PeerPort => $port ) or die "can't connect to port $port on $host: $!"; $handle->autoflush(1); # so output gets there right away print STDERR "[Connected to $host:$port]\n"; # split the program into two processes, identical twins die "can't fork: $!" unless defined( $kidpid = fork() ); # the if{} block runs only in the parent process if ($kidpid) { # copy the socket to standard output while ( defined( $line = <$handle> ) ) { print STDOUT $line; } kill( "TERM", $kidpid ); # send SIGTERM to child } # the else{} block runs only in the child process else { # copy standard input to the socket while ( defined( $line = <STDIN> ) ) { #print $handle "$name->$line"; print $handle "$line"; } } __END__

    I'm not really a human, but I play one on earth. flash japh

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://473523]
Approved by kirbyk
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others cooling their heels in the Monastery: (4)
As of 2024-04-19 02:15 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found