Update: Added next statement to not delay unless the inbox is empty.
The following is a demonstration using Parallel::ForkManager + MCE::Shared + Foo::Inbox.
use strict;
use warnings;
use Foo::Inbox;
use Parallel::ForkManager;
use MCE::Shared;
use List::Util 'shuffle';
use Time::HiRes 'sleep';
my $inbox = MCE::Shared->share( Foo::Inbox->new() );
my @names = shuffle qw/ Barny Betty Fred Wilma /;
my $index = 0;
$| = 1;
sub foo {
my $name = shift;
my $count = 0;
# remove my name from the list
@names = grep { $_ ne $name } shuffle @names;
# send greeting to names on the list
$inbox->send($name, \@names, 'Hello');
while ( 1 ) {
if ( my ($from, $data) = $inbox->recv($name) ) {
# so soon, alrighty then ;-)
last if $data->[0] eq 'END';
# display the message received
printf "%-5s received %s from %s\n", $name, $data->[0], $from;
# forward the message to another worker
$inbox->send($name, $names[ ++$index % @names ], $data->[0])
if ( $from eq 'manager' );
next;
}
sleep 0.01;
}
}
my $pm = new Parallel::ForkManager(scalar @names);
$pm->set_waitpid_blocking_sleep(0);
foreach my $name ( @names ) {
$pm->start($name) and next;
# Have the shared-manager assign a data channel for IPC.
# This is done automatically for MCE, MCE::Hobo, and threads.
MCE::Shared->init();
# Enter the foo routine.
foo($name);
$pm->finish($name);
}
# Enter message or type quit to terminate the script.
while ( my $msg = <STDIN> ) {
chomp $msg; next unless ( length $msg );
$inbox->end(), last() if ( $msg eq 'quit' );
$inbox->send('manager', $names[ ++$index % @names ], $msg);
}
$pm->wait_all_children;
MCE::Signal handles signal handling behind the scene if one were to press Ctrl-C (SIGINT). Ditto for the process receiving SIGQUIT or SIGTERM. What is nice about this arrangement is that MCE::Shared and MCE::Signal (loaded by MCE::Shared::Server) complements Parallel::ForkManager quite nice ;-).
For SIGPIPE to work reliably, it requires $| = 1 on STDOUT. Then the following is possible on any Unix platforms. The script terminates gracefully including the tempdir removal inside Parallel::ForkManager.
perl script.pl | head -5
Regards, Mario