#!/usr/bin/perl use strict; use warnings; use IO::Socket; use threads; use Thread::Queue; #get the port to bind to or default to 8000 my $port = $ARGV[0] || 8000; my $listeners = 10; # a hash to record client machines and thread queue for internal comunication my %clients; my $mqueue = Thread::Queue -> new; my $squeue = Thread::Queue -> new; threads->create ("monitor", $mqueue); for (1..$listeners) { threads->create ("read_data", $mqueue, $squeue)->detach; } #create the listen socket my $listen_socket = IO::Socket::INET->new(LocalPort => $port, Listen => 20, Proto => 'tcp', Reuse => 1); #make sure we are bound to the port die "Cant't create a listening socket: $@" unless $listen_socket; warn "Server ready. Waiting for connections on $port ... \n"; #wait for connections at the accept call while (my $connection = $listen_socket->accept) { # put the connection on the queue for a reader thread $squeue -> enqueue($connection); } sub read_data { # accept data from the socket and put it on the queue my ($mqueue, $squeue) = @_; while (my $socket = $squeue -> dequeue) { while (<$socket>) { print "listener got: $_"; $mqueue -> enqueue(time." $_"); } close $socket; } } sub monitor { my $mqueue = shift; while (1) { while ($mqueue -> pending) { my $data = $mqueue -> dequeue; print "monitor got: $data"; $data =~ /(\d+) Heartbeat from (\S+) next one in (\d+) minutes/; my $time = $1; my $client = $2; my $cycle = $3; if (defined $clients{$client} and $clients{$client} -> [0] eq 'NAK') { print "$client sent a beat again\n"; } $clients{$client} = [ 'OK', $time + $cycle * 60 ]; } for my $client (keys %clients) { next if $clients{$client}->[0] eq 'NAK'; next if $clients{$client}->[1] > time; print "$client missed a heartbeat, expected at $clients{$client}->[1], now it is ".time."\n"; $clients{$client}->[0] = 'NAK'; } sleep 30; } }