############################################################################# ######## MAIN THREAD my $lsn = new IO::Socket::INET( Listen => SOMAXCONN, LocalPort => $port, ReuseAddr => 1, Blocking => 1, ) or die "Failed to open listening port: $!\n"; print "RXD+ had been started on port $port\n"; debug("Password is '$password'"); my @threads = (); $doneQ = MyQ->new(); $clientQ = MyQ->new(); for(1..$_numThreads){ my $thd = threads->create('processRequest'); push(@threads, $thd); } debug("Thread creation finished"); #making the CWD the directory RXD is located in use FindBin qw($Bin); my $currentDir = $Bin; if ($^O =~ /win/i) { $currentDir =~ tr/\//\\/; } elsif (($^O !~ /win/i) && ($currentDir !~ /\//)) { $currentDir .= "/"; } chdir($currentDir); # create a cache hash while(1) { my $client; my $fd; debug("Accepting connection, there are currently " . $clientQ->pending() . " connections in the queue"); unless ($client = $lsn->accept) { tprint ("Could not connect to socket: " . $!); next; } debug("Connection accepted"); if(!defined($client)){tprint("Could not connect to client"); next;} # change filehandle to file descriptor $fd = fileno $client; unless(defined($fd)){close($client);next;} # add filehandle to cache hash # tprint("Caching file descriptor- $fd"); $FDcache{$fd} = $client; debug("Enqueueing"); $clientQ->enqueue($fd); while(my $done = $doneQ->dequeue_nb()){ debug("Deleting from cache"); delete $FDcache{$done}; } } sub shutdownRXD{ tprint("Sutting down"); exit 0; } ############################################################################# package MyQ; use strict; use warnings; use threads::shared; # Create a new queue possibly pre-populated with items sub new { my $class = shift; my @queue :shared = map { shared_clone($_) } @_; return bless(\@queue, $class); } sub enqueue { my $queue = shift; lock(@$queue); # print "Q> enqueueing\n"; push(@$queue, map { shared_clone($_) } @_) and cond_signal(@$queue); } # Return a count of the number of items on a queue sub pending { my $queue = shift; lock(@$queue); return scalar(@$queue); } # Return 1 or more items from the head of a queue, blocking if needed sub dequeue { my $queue = shift; lock(@$queue); # print "Q> dequeing\n"; # Wait for requisite number of items cond_wait(@$queue) until (@$queue >= 1); cond_signal(@$queue) if (@$queue > 1); # Return single item return shift(@$queue); } # Return items from the head of a queue with no blocking sub dequeue_nb { my $queue = shift; lock(@$queue); # Return single item return shift(@$queue); }