For the sake of completeness, I want to share the resolution I ended up using POE (worlds of gratitude to Rocco for his input and willingness to answer my questions).
Here is the boilerplate code for the solution I created and I hope others find this useful as well! Side note: For my purposes, I needed to use POE::Filter::Stream but the default is POE::Filter::Line
Though this is my current fix to the problem, I'm not very satisfied with the workaround using log files which can be cumbersome. If anyone has any suggestions/tips how I could improve it and/or make it more efficient (pipes, etc.) I would love to hear ideas!
#!/usr/bin/perl -w
## This example establishes a full duplex
## TCP Client/Server connection in one file
##
## In order to facilitate two-way communication,
## log files located in /var/log/ are written to by
## a client program and POE uses POE::Wheel::FollowTail
## to signal that there is an information update
## (and suggestions on how to improve/optimize that
## would be very welcome)
use strict;
use warnings;
use POE qw(Component::Server::TCP
Component::Client::TCP
Wheel::FollowTail
Filter::Stream);
use constant INACTIVITY_TIMEOUT => 10; # seconds
use constant RECONNECT_TIMEOUT => 15; # seconds
# this layout was inspired by this entry
# from the POE Cookbook:
# http://poe.perl.org/?POE_Cookbook/Watching_Logs
my %logs_to_watch = (
log_one => "/var/log/log_one.log";
log_two => "/var/log/log_two.log";
);
POE::Session->create(
inline_states => {
_start => \&begin_watchers,
log_one_record => \&log_one_got_record,
log_two_record => \&log_two_got_record,
log_reset => \&generic_log_reset,
log_error => \&generic_log_error,
},
);
my $server = POE::Component::Server::TCP->new(
Alias =>'TCPServer',
Port => 8000,
ClientConnected => sub {
print 'Client connected! (' . localtime . ")\n";
},
ClientDisconnected => sub {
print 'Client disconnected! (' . localtime . "\n";
},
ClientInput => sub {
my $input = $_[ARG0];
print "Server got: $input\n";
my $reply = '000'; # example successful ACK response
$_[HEAP{client}->put($reply);
},
# any of the POE::Filter options can be used here
# depending on remote input
ClientFilter => "POE::Filter::Stream",
Stopped => sub {
my ($kernel) = $_[KERNEL];
# terminates TCPClient's connection with
# remote server if the local server shuts down
$kernel->post('TCPClient', 'shutdown');
},
);
my $client = POE::Component::Client::TCP->new(
Alias => 'TCPClient',
RemoteAddress => 'example.com',
RemotePort => 8001,
Connected => sub {
my ($kernel) = $_[KERNEL];
# triggers a keepalive timer for a custom keepalive method
# to be fired after the set interval while connected
$kernel->delay(keepalive => INACTIVITY_TIMEOUT);
},
Disconnected => sub {
my ($kernel) = $_[KERNEL];
# sets keepalive to undef to stop the delay timer
$kernel->delay(keepalive => undef);
# sets timer to attempt reconnection to address
$kernel->delay(reconnect => RECONNECT_TIMEOUT);
},
Filter => "POE::Filter::Stream",
InlineStates => {
keepalive => \&keepalive,
send_request => \&send_request,
},
ServerInput => sub {
my ($kernel, $input) = $_[KERNEL, ARG0];
# $input would be the remote server's ACK/NAK response..handle
+ accordingly
$kernel->delay(keepalive => INACTIVITY_TIMEOUT);
},
ServerError => sub {
my ($operation, $errno, $errstr, $id) = @_[ARG0..ARG3];
if ($operation eq "read" and $errno == 0 {
print "EOF encountered!\n";
}
else {
print "$id encountered $operation error $errno: $errstr\n"
+;
}
},
);
sub begin_watchers {
my ($kernel, $heap) = @_[KERNEL, HEAP];
while (my ($service, $log_file) each %logs_to_watch) {
my $log_watcher = POE::Wheel::FollowTail->new(
Filename => $log_file,
# based on what the purpose is
Filter => POE::Filter::Stream->new();
InputEvent => $service . "_record",
ResetEvent => "log_reset",
ErrorEvent => "log_error",
);
$heap->{services}->{$log_watcher->ID} = $service;
$heap->{watchers}->{$log_watcher->ID} = $log_watcher;
}
}
sub generic_log_reset {
my ($heap, $wheel_id) = @_[HEAP, ARG0];
my $service = $heap->{services}->{$wheel_id};
print "--- $service log reset at ", scalar(gmtime), " GMT\n";
}
sub generic_log_error {
my ($heap, $operation, $errno, $error_string, $wheel_id) = @_[HEAP
+, ARG0, ARG1, ARG2, ARG3];
my $service = $heap->{services}->{$wheel_id};
print "--- $service log $operation error $errno: $error_string\n";
print "--- Shutting down $service log watcher.\n";
delete $heap->{services}->{$wheel_id};
delete $heap->{watchers}->{$wheel_id};
}
sub log_one_got_record {
my $log_record = $_[ARG0];
"log_one_got_record received: $log_record\n";
if ($log_record =~ /log_one_string/) {
# notifies $client of the information added
my $postrequest = POE::Kernel->post('TCPClient', "send_request
+", $log_record);
}
}
sub log_two_got_record {
my $log_record = $_[ARG0];
"log_two_got_record received: $log_record\n";
if ($log_record =~ /log_two_string/) {
# notifies $client of the information added
my $postrequest = POE::Kernel->post('TCPClient', "send_request
+", $log_record);
}
}
sub keepalive {
#format keepalive(i.e. ping) message here
my $keepalive = '';
$_[HEAP{server}->put($keepalive);
$KERNEL->delay(keepalive => INACTIVITY_TIMEOUT);
}
sub send_request {
my $request = $_[ARG0];
print "Sent from KohaClient to EMS: $request\n";
$_[HEAP]{server}->put($request);
}
POE::Kernel->run();
|