http://www.perlmonks.org?node_id=549496


in reply to Re^3: multithreaded tcp listener with IO::Socket
in thread multithreaded tcp listener with IO::Socket

hi BrowserUk,

I tried using a thread pool but can not seem to pass the socket handle through the thread queue. Here is the error I get thrown by Thread::Queue. This happens as soon as the first client connects.

Updated, see below
C:\test>perl new_ecg.pl Server ready. Waiting for connections on 8000 ... Invalid value for shared scalar at C:/Perl/lib/Thread/Queue.pm line 90 +. A thread exited while 12 threads were running.
Here is the code I am using
#!/usr/bin/perl use strict; use warnings; use IO::Socket; use threads; use Thread::Queue; #get the port to bind to or default to 8000 my $port = $ARGV[0] || 8000; my $listeners = 10; # a hash to record client machines and thread queue for internal comun +ication my %clients; my $mqueue = Thread::Queue -> new; my $squeue = Thread::Queue -> new; threads->create ("monitor", $mqueue); for (1..$listeners) { threads->create ("read_data", $mqueue, $squeue)->detach; } #create the listen socket my $listen_socket = IO::Socket::INET->new(LocalPort => $port, Listen => 20, Proto => 'tcp', Reuse => 1); #make sure we are bound to the port die "Cant't create a listening socket: $@" unless $listen_socket; warn "Server ready. Waiting for connections on $port ... \n"; #wait for connections at the accept call while (my $connection = $listen_socket->accept) { # put the connection on the queue for a reader thread $squeue -> enqueue($connection); } sub read_data { # accept data from the socket and put it on the queue my ($mqueue, $squeue) = @_; while (my $socket = $squeue -> dequeue) { while (<$socket>) { print "listener got: $_"; $mqueue -> enqueue(time." $_"); } close $socket; } } sub monitor { my $mqueue = shift; while (1) { while ($mqueue -> pending) { my $data = $mqueue -> dequeue; print "monitor got: $data"; $data =~ /(\d+) Heartbeat from (\S+) next one in (\d+) min +utes/; my $time = $1; my $client = $2; my $cycle = $3; if (defined $clients{$client} and $clients{$client} -> [0] + eq 'NAK') { print "$client sent a beat again\n"; } $clients{$client} = [ 'OK', $time + $cycle * 60 ]; } for my $client (keys %clients) { next if $clients{$client}->[0] eq 'NAK'; next if $clients{$client}->[1] > time; print "$client missed a heartbeat, expected at $clients{$c +lient}->[1], now it is ".time."\n"; $clients{$client}->[0] = 'NAK'; } sleep 30; } }
Update
I had a search on that error and found Thread::Queue can only handle simple scalar values. I downloaded and installed Thread::Queue::Any which can handle more complex objects but it still looks to choke on the handle ...
[tivadm@rtmr]/home/tivoli/robin$ ./new_ecg.pl Server ready. Waiting for connections on 8000 ... Can't store GLOB items at ../../lib/Storable.pm (autosplit into ../../ +lib/auto/Storable/_freeze.al) line 282, at /usr/opt/perl5/lib/site_pe +rl/5.8.0/Thread/Queue/Any.pm line 30 A thread exited while 12 other threads were still running.
Any suggestions how I can pass the sockets to the thread pool ? Perhaps I can find another data structure that can hold them and then pass some form of pointer to that.

Cheers,
R.

Pereant, qui ante nos nostra dixerunt!