Update: Added size method to Inbox. Updated recv method to safeguard from autovivification. Renamed quit method to end for consistency with other helper classes. Finally, changed 'quit' to 'END' inside the demo script.
Update: Added next statement in demo (2nd source) to not delay unless the inbox is empty.
Regarding MCE::Shared, the following is an Inbox module suitable for sharing. Once shared, the object enables inter-process communication between workers supporting threads and child processes.
package Foo::Inbox;
use strict;
use warnings;
our $VERSION = '0.003';
# $inbox = Foo::Inbox->new();
sub new {
bless [ {}, [] ], shift;
}
# $scalar = $inbox->size( [ $key ] );
# %pairs = $inbox->size();
sub size {
my ( $self, $key ) = @_;
if ( defined $key ) {
exists $self->[0]{$key} ? scalar @{ $self->[0]{$key} } : 0;
}
elsif ( wantarray ) {
local $_;
map { $_ => scalar @{ $self->[0]{$_} } } keys %{ $self->[0] };
}
else {
my $size = 0;
foreach my $key ( keys %{ $self->[0] } ) {
$size += scalar @{ $self->[0]{$key} };
}
$size;
}
}
# $inbox->send( $from, $to, $arg1, ... );
# $inbox->send( $from, \@list, $arg1, ... );
sub send {
my ( $self, $from, $to ) = ( shift, shift, shift );
my $mesg = [ $from, [ @_ ] ];
if ( ref $to eq 'ARRAY' ) {
push @{ $self->[0]{$_ } }, $mesg for @{ $to };
} else {
push @{ $self->[0]{$to} }, $mesg;
}
return;
}
# $inbox->recv( $from );
sub recv {
my ( $self, $from ) = @_;
return @{ $self->[1] } unless exists $self->[0]{ $from };
@{ shift @{ $self->[0]{ $from } } // $self->[1] };
}
# $inbox->end();
sub end {
$_[0]->[1] = [ 'manager', [ 'END' ] ];
return;
}
1;
A worker may send a data structure or a list (not shown here). Doing so, MCE::Shared will handle serialization automatically via Sereal 3.015+ if available or Storable.
use strict;
use warnings;
use Foo::Inbox;
use MCE::Hobo;
use MCE::Shared;
use List::Util 'shuffle';
use Time::HiRes 'sleep';
my $inbox = MCE::Shared->share( Foo::Inbox->new() );
my @names = shuffle qw/ Barny Betty Fred Wilma /;
my $index = 0;
$| = 1;
sub foo {
my $name = shift;
my $count = 0;
# remove my name from the list
@names = grep { $_ ne $name } shuffle @names;
# send greeting to names on the list
$inbox->send($name, \@names, 'Hello');
while ( 1 ) {
if ( my ($from, $data) = $inbox->recv($name) ) {
# so soon, alrighty then ;-)
last if $data->[0] eq 'END';
# display the message received
printf "%-5s received %s from %s\n", $name, $data->[0], $from;
# forward the message to another worker
$inbox->send($name, $names[ ++$index % @names ], $data->[0])
if ( $from eq 'manager' );
next;
}
sleep 0.01;
}
}
MCE::Hobo->create(\&foo, $_) for @names;
# Enter message or type quit to terminate the script.
while ( my $msg = <STDIN> ) {
chomp $msg; next unless ( length $msg );
$inbox->end(), last() if ( $msg eq 'quit' );
$inbox->send('manager', $names[ ++$index % @names ], $msg);
}
MCE::Hobo->waitall;
I've entered in the terminal words from the song Hello by Adele: Hello, it's me -- Hello, can you hear me?. Then entered quit to exit the application.
Betty received Hello from Barny
Wilma received Hello from Barny
Fred received Hello from Barny
Wilma received Hello from Betty
Barny received Hello from Betty
Betty received Hello from Wilma
Fred received Hello from Betty
Betty received Hello from Fred
Fred received Hello from Wilma
Wilma received Hello from Fred
Barny received Hello from Wilma
Barny received Hello from Fred
Hello, it's me
Betty received Hello, it's me from manager
Barny received Hello, it's me from Betty
Hello, can you hear me?
Wilma received Hello, can you hear me? from manager
Fred received Hello, can you hear me? from Wilma
quit
A subsequent version of Inbox might provide blocking capabilities so not to poll. Doing so means having to make many pipes or fifos for which I'm not sure. Imagine a future server having 400 logical cores. Making all those pipes or fifos seems excessive, imho.
Update: I've benchmarked Inbox to determine if having to make pipes or fifos. Adding the next statement enabled the demo script to run reasonably fast if by chance the inbox is filled with messages or from many workers sending simultaneously to the same recipient. It's also possible for two or more workers to read from the same inbox if needed in parallel.
Regards, Mario