smarthacker67 has asked for the wisdom of the Perl Monks concerning the following question:
HI ,
I need help achieving the child process to child process communication.Forked a multiple child, now lets say PID 1,PID 2,PID 3
I wish to send command fetched on PID1 (saying I want to connect to PID 3) to PID 3 how to achieve this ?
Tried searched a lot about Perl Pipes ,IPC but couldn't succeed.
basically I can read command on PID1 but want to send it to particular PID XX how to select such PID ?
Thanks in advance.
Re: Child process inter communication
by zentara (Cardinal) on Jun 13, 2017 at 13:11 UTC
|
Hi, these following codes have been posted before in some lost node numbers. But here is the way to do it:
Also see Fork and wait question. You can setup 2 way pipes too, so you can have bi-directional communication between
parent and child.
#!/usr/bin/perl
use warnings;
use strict;
#piping all child output to parent
# by Zaxo of perlmonks
# we'll define some subroutiness to act as children
# and open a single pipe.
my @kids = (
sub { print "First!", $/; },
sub { print "Second!", $/; },
sub { print "Third!", $/; },
);
pipe my ( $in, $out );
# Now we start breeding children. Each will close the $in handle,
# select the $out handle as the default, set up autoflush on it,
# and then call their sub. We definitely need to prevent zombies,
# since the kids will all finish before the parent does, so we
# start by declaring a hash where we will keep their pid's.
my %kid;
for (@kids) {
my $cpid = fork;
defined $cpid or warn("Couldn't fork"), next;
$cpid or do { # child
close $in or die $!;
select $out;
$| = 1;
$_->();
exit 0;
};
$kid{$cpid} = undef;
}
# The select statement is the one-arg kind, which windows should
# handle ok. Back in the parent, now, we have an unwanted $out handle,
# a single $in handle that the kids are all fighting to talk on, and
# a hash to remind us of the kids' names. Wrapping up, we close $out,
# listen to the kids in turn, decorate their messages to show
# who's repeating them, and finally call wait enough times to
# bury them all safely. We must be careful not to die before that.
close $out or warn $!;
s/.$/??? says the child/, print while <$in>;
delete $kid{ +wait } while %kid;
# Many-to-one pipes like this will keep their messages' integrity
# in flush-sized chunks. So long as they are shorter than than
# an I/O buffer and end with $/, they will be read as distinct.
# You may want to have the kids tag their messages to
# identify the source.
# I've tested this on Windows XP and Linux. It works on both.
or simpler, pipe all output to parent
#!/usr/bin/perl
use strict;
use warnings;
my $pid = open(CHILD, "-|");
if ($pid)
{
# parent
print "parent got:\n";
print while(<CHILD>);
close CHILD; # this waits on child
}
elsif ($pid == 0)
{
# child
print "kid here!\n";
exec '/bin/date' or die "Can't exec date $!\n";
}
else
{
die "fork error: $!\n";
}
| [reply] [d/l] [select] |
Re: Child process inter communication
by Corion (Patriarch) on Jun 13, 2017 at 09:04 UTC
|
What do you mean by "select such PID"?
Are you asking about a good algorithm to do load scheduling? Or do you want to know how to pass information from your master process to child process XX?
Personally, I would look into queues, and invert the problem. Have the master post jobs into the queue and the workers take jobs from that queue. That way ou don't have idle workers.
Such a setup can range from directories into which jobs are placed and moved via rename by the clients to various Queues. Depending on your needs about the queues (persistence, deliver at most once, deliver at least once), you could use Forks::Queue or Thread::Queue or Queue::DBI or IPC::DirQueue, or ZMQ or MQSeries::Queue.
| [reply] |
A reply falls below the community's threshold of quality. You may see it by logging in. |
Re: Child process inter communication
by hippo (Archbishop) on Jun 13, 2017 at 08:56 UTC
|
TIMTOWTDI. There are so many approaches for this and many of them are discussed in perlipc - have you read it? Did you try the examples in it? In what way were you unable to apply these examples to your specific problem?
| [reply] |
Re: Child process inter communication
by tybalt89 (Monsignor) on Jun 14, 2017 at 01:42 UTC
|
#!/usr/bin/perl
# http://perlmonks.org/?node_id=1192662
use strict;
use warnings;
use IO::Select;
my $childcount = 3;
my $hasterminal = 1;
my %who;
my %pipes;
for my $from (1 .. $childcount)
{
for my $to (1 .. $from - 1, $from + 1 .. $childcount)
{
pipe( my $readhandle, my $writehandle) or die "$! on pipe";
$writehandle->autoflush;
$pipes{$from}{$to}{rh} = $readhandle;
$pipes{$from}{$to}{wh} = $writehandle;
}
}
for my $me (1 .. $childcount)
{
if( my $pid = fork ) # parent
{
$who{$pid} = $me;
}
elsif( defined $pid ) # child
{
my $sel = IO::Select->new;
$me == $hasterminal and $sel->add(*STDIN);
for my $from (1 .. $me - 1, $me + 1 .. $childcount)
{
$sel->add($pipes{$from}{$me}{rh});
close $pipes{$from}{$me}{wh};
}
while(1)
{
for my $handle ($sel->can_read)
{
defined( my $command = <$handle> ) or exit;
print "$me got $command";
$command =~ /^(\d+)\s+(.*\n)/ and $1 != $me and
print { $pipes{$me}{$1}{wh} } $2;
}
}
}
else
{
die "fork failed with $!";
}
}
use Data::Dump 'pp'; pp \%who;
my $pid = wait; # on first exit, kill rest
print "$who{$pid} exited\n";
kill 15, keys %who;
print "$who{$pid} exited\n" while ($pid = wait) > 0;
$hasterminal determines which child listens to the terminal for commands.
Commands with a leading number are forwarded to that child.
If you type in
2 3 foobar
you get back
1 got 2 3 foobar
2 got 3 foobar
3 got foobar
showing that child 1 got the message, then forwarded it to child 2, who then forwarded it to child 3.
Any child can send directly to any other child using the pipes set up in the hash %pipes before the children are forked.
Note that this is just a prototype and should not be considered production code (there's a bit of cheating going on in it :)
| [reply] [d/l] [select] |
Re: Child process inter communication
by smarthacker67 (Beadle) on Jun 14, 2017 at 06:58 UTC
|
Thanks for all your replies.
I should have mentioned that in my case child has been forked already and now I wish to communicate with specific.
need Child to Child communication.
kindly check the comment section below
Suggestion for alternate approach is most welcome as well.
sub make_new_child {
# Make new child...
my $pid;
my $sigset;
# block signal for fork
$sigset = POSIX::SigSet->new(SIGINT);
sigprocmask(SIG_BLOCK, $sigset)
or die "Can't block SIGINT for fork: $!\n";
die "fork: $!" unless defined ($pid = fork);
if ($pid) {
# Parent records the child's birth and returns.
sigprocmask(SIG_UNBLOCK, $sigset)
or die "Can't unblock SIGINT for fork: $!\n";
print "PID $pid\n" if $Debug;
$children{$pid} = 1;
$children++;
return;
} else {
print "Child....\n" if $Debug;
$SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did be
+fore
my $request = ""; # full long big request
my $nreads = 0; # number of read loops
my $tmout = 150; # timeout in second
# unblock signals
sigprocmask(SIG_UNBLOCK, $sigset)
or die "Can't unblock SIGINT for fork: $!\n";
# handle connections until we've reached $MAX_CLIENTS_PER_CHIL
+D
for (my $i=0; $i < $MAX_CLIENTS_PER_CHILD; $i++) {
# my $device="";
print "New connection....\n" if $Debug;
my $client = $Socket->accept() or last;
$client->autoflush(1);
($Debug && $client) ? print "Accepted....\n" : print "No S
+ignal Recieved\n";
# do something with the connection
my $sel = IO::Select->new($client);
while ( $sel->can_read($tmout) )
{
my $n = sysread($client, $request, 9999,length($re
+quest));
last if ($n==0);
$nreads++;
my $display = "";
if ( $nreads == 0 ) {
$display = '[timeout]';
} elsif ( length $request == 0 ) {
$display = '[empty]';
} else {
$display = $request;
}
$request="";
print "$display\n" if $Debug;
my @data = split(/,/,$display);
if($data[0] eq '!!'){
# A command
####
####Received the command here to send it t
+o child PID 123
###### child PID 123 has been already fork
+ed and free
###### to serve this request
print "A command, Lets send\n";
my $to_pid = $data[1];
my $command = $data[2];#__________________
+___________________________________________________
#
# Lets send command to $to_pid from here
#_________________________________________
+____________________________
# qx (echo e)
print CHLD_IN "$command $pid sent from $p
+id \n";
#print FHY ("OK, Lets Send coand\n");
sleep(3);
} else {
}
}
}
exit(0);
}
}
| [reply] [d/l] |
|
Your code is quite long, and dosn't actually run as an complete example. If you want communications between the children, then create pipes in the parent, and pass them to the children as you fork them, pass the filedesciptor number in many cases.
Look at Parallel::ForkManager + MCE::Shared demonstration for a state-of-the-art solution, or you could just switch to threads from forks, and use thread::shared variables.
Or forking with Storable and IPC::ShareLite. Otherwise, start looking at how you make pipes, and pass them off to child processes. Also see FileHandles and threads
| [reply] |
|
Update: Added size method to Inbox. Updated recv method to safeguard from autovivification. Renamed quit method to end for consistency with other helper classes. Finally, changed 'quit' to 'END' inside the demo script.
Update: Added next statement in demo (2nd source) to not delay unless the inbox is empty.
Regarding MCE::Shared, the following is an Inbox module suitable for sharing. Once shared, the object enables inter-process communication between workers supporting threads and child processes.
package Foo::Inbox;
use strict;
use warnings;
our $VERSION = '0.003';
# $inbox = Foo::Inbox->new();
sub new {
bless [ {}, [] ], shift;
}
# $scalar = $inbox->size( [ $key ] );
# %pairs = $inbox->size();
sub size {
my ( $self, $key ) = @_;
if ( defined $key ) {
exists $self->[0]{$key} ? scalar @{ $self->[0]{$key} } : 0;
}
elsif ( wantarray ) {
local $_;
map { $_ => scalar @{ $self->[0]{$_} } } keys %{ $self->[0] };
}
else {
my $size = 0;
foreach my $key ( keys %{ $self->[0] } ) {
$size += scalar @{ $self->[0]{$key} };
}
$size;
}
}
# $inbox->send( $from, $to, $arg1, ... );
# $inbox->send( $from, \@list, $arg1, ... );
sub send {
my ( $self, $from, $to ) = ( shift, shift, shift );
my $mesg = [ $from, [ @_ ] ];
if ( ref $to eq 'ARRAY' ) {
push @{ $self->[0]{$_ } }, $mesg for @{ $to };
} else {
push @{ $self->[0]{$to} }, $mesg;
}
return;
}
# $inbox->recv( $from );
sub recv {
my ( $self, $from ) = @_;
return @{ $self->[1] } unless exists $self->[0]{ $from };
@{ shift @{ $self->[0]{ $from } } // $self->[1] };
}
# $inbox->end();
sub end {
$_[0]->[1] = [ 'manager', [ 'END' ] ];
return;
}
1;
A worker may send a data structure or a list (not shown here). Doing so, MCE::Shared will handle serialization automatically via Sereal 3.015+ if available or Storable.
use strict;
use warnings;
use Foo::Inbox;
use MCE::Hobo;
use MCE::Shared;
use List::Util 'shuffle';
use Time::HiRes 'sleep';
my $inbox = MCE::Shared->share( Foo::Inbox->new() );
my @names = shuffle qw/ Barny Betty Fred Wilma /;
my $index = 0;
$| = 1;
sub foo {
my $name = shift;
my $count = 0;
# remove my name from the list
@names = grep { $_ ne $name } shuffle @names;
# send greeting to names on the list
$inbox->send($name, \@names, 'Hello');
while ( 1 ) {
if ( my ($from, $data) = $inbox->recv($name) ) {
# so soon, alrighty then ;-)
last if $data->[0] eq 'END';
# display the message received
printf "%-5s received %s from %s\n", $name, $data->[0], $from;
# forward the message to another worker
$inbox->send($name, $names[ ++$index % @names ], $data->[0])
if ( $from eq 'manager' );
next;
}
sleep 0.01;
}
}
MCE::Hobo->create(\&foo, $_) for @names;
# Enter message or type quit to terminate the script.
while ( my $msg = <STDIN> ) {
chomp $msg; next unless ( length $msg );
$inbox->end(), last() if ( $msg eq 'quit' );
$inbox->send('manager', $names[ ++$index % @names ], $msg);
}
MCE::Hobo->waitall;
I've entered in the terminal words from the song Hello by Adele: Hello, it's me -- Hello, can you hear me?. Then entered quit to exit the application.
Betty received Hello from Barny
Wilma received Hello from Barny
Fred received Hello from Barny
Wilma received Hello from Betty
Barny received Hello from Betty
Betty received Hello from Wilma
Fred received Hello from Betty
Betty received Hello from Fred
Fred received Hello from Wilma
Wilma received Hello from Fred
Barny received Hello from Wilma
Barny received Hello from Fred
Hello, it's me
Betty received Hello, it's me from manager
Barny received Hello, it's me from Betty
Hello, can you hear me?
Wilma received Hello, can you hear me? from manager
Fred received Hello, can you hear me? from Wilma
quit
A subsequent version of Inbox might provide blocking capabilities so not to poll. Doing so means having to make many pipes or fifos for which I'm not sure. Imagine a future server having 400 logical cores. Making all those pipes or fifos seems excessive, imho.
Update: I've benchmarked Inbox to determine if having to make pipes or fifos. Adding the next statement enabled the demo script to run reasonably fast if by chance the inbox is filled with messages or from many workers sending simultaneously to the same recipient. It's also possible for two or more workers to read from the same inbox if needed in parallel.
Regards, Mario | [reply] [d/l] [select] |
|
|
|
#!/usr/bin/perl
# http://perlmonks.org/?node_id=1192662
use strict;
use warnings;
use IO::Select;
use POSIX qw(mkfifo);
my $childcount = 3;
my $hasterminal = 1;
my %who;
my $fifodir = '/tmp';
for my $me (1 .. $childcount)
{
if( my $pid = fork ) # parent
{
$who{$pid} = $me;
}
elsif( defined $pid ) # child
{
my $mypath = "$fifodir/$$.fifo";
mkfifo( $mypath, 0700 ) or die "$! on making mypath";
open my $fifo, '+<', $mypath or die "$! opening $mypath";
my $sel = IO::Select->new($fifo);
$me == $hasterminal and $sel->add(*STDIN);
while(1)
{
for my $handle ($sel->can_read)
{
defined( my $command = <$handle> ) or exit;
print "$$ got $command";
if( $command =~ /^(\d+)\s+(.*\n)/ )
{
my $otherpath = "$fifodir/$1.fifo";
if( -p $otherpath and open my $otherchild, '>', $otherpath )
{
print $otherchild $2;
close $otherchild;
}
else
{
warn "child $1 does not have a fifo\n";
}
}
}
}
}
else
{
die "fork failed with $!";
}
}
use Data::Dump 'pp'; pp \%who;
my $pid = wait; # on first exit, kill rest
print "$who{$pid} exited\n";
kill 15, keys %who;
unlink map "$fifodir/$_.fifo", keys %who;
print "$who{$pid} exited\n" while ($pid = wait) > 0;
Enter a pid followed by white space and a message, it will forward the message to that pid.
Example - enter:
25910 somemessage
to get
25908 got 25910 somemessage
25910 got somemessage
Of course, using your actual pids.
| [reply] [d/l] [select] |
|
$me == $hasterminal and $sel->add(*STDIN);
Regards, Mario | [reply] [d/l] |
Re: Child process inter communication
by marioroy (Prior) on Jun 18, 2017 at 23:25 UTC
|
Update: Added Foo::Inbox4 and removed Foo::Inbox3. I've been accustomed to automatic serialization of complex data structure in MCE::Shared that I didn't realize on having to do that manually for Thread::Queue.
Corion mentioned queues. The following provides two queue implementations based on Foo::Inbox.
Foo::Inbox2 using MCE::Shared->queue
Foo::Inbox4 using Thread::Queue
A demo and benchmark will follow in the immediate post(s).
Regards, Mario
| [reply] [d/l] [select] |
|
$ diff hobo_test.pl threads_test.pl
5,6c5,6
< use MCE::Hobo;
< use Foo::Inbox2;
---
> use threads;
> use Foo::Inbox4;
11c11
< my $inbox = Foo::Inbox2->new( @names );
---
> my $inbox = Foo::Inbox4->new( @names );
38,39c38,39
< MCE::Hobo->create(\&foo, $_) for @names;
< MCE::Hobo->waitall;
---
> threads->create(\&foo, $_) for @names;
> $_->join() for threads->list();
Foo::Inbox2 via MCE::Hobo + MCE::Shared->queue 50k test
Foo::Inbox4 via threads + Thread::Queue 50k test
Update: Added results for MCE::Inbox which will ship with MCE::Shared 1.827 on release day. It is Inbox2 with optimizations.
Results from Mac OS X and Cent OS 7.
# Mac OS X ( Perl v5.18.2 )
$ perl foo1.pl | wc -l # Foo::Inbox
duration: 0.690 seconds
50000
$ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability
duration: 0.721 seconds
50000
$ perl foo2.pl | wc -l # MCE::Shared->queue
duration: 0.789 seconds
50000
$ perl foo4.pl | wc -l # Thread::Queue
duration: 5.735 seconds
50000
# CentOS 7 VM ( Perl v5.16.3 )
$ perl foo1.pl | wc -l # Foo::Inbox
duration: 0.834 seconds
50000
$ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability
duration: 0.726 seconds
50000
$ perl foo2.pl | wc -l # MCE::Shared->queue
duration: 0.945 seconds
50000
$ perl foo4.pl | wc -l # Thread::Queue
duration: 3.020 seconds
50000
Results from Cygwin and Strawberry Perl.
# Windows ( Cygwin Perl v5.22.3 )
$ perl foo1.pl | wc -l # Foo::Inbox
duration: 1.825 seconds
50000
$ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability
duration: 2.059 seconds
50000
$ perl foo2.pl | wc -l # MCE::Shared->queue
duration: 2.387 seconds
50000
$ perl foo4.pl | wc -l # Thread::Queue
duration: 24.086 seconds
50000
# Windows ( Strawberry Perl v5.22.2.1 )
$ perl foo1.pl > nul # Foo::Inbox
duration: 1.570 seconds
$ perl inbox.pl > nul # MCE::Inbox w/ blocking capability
duration: 1.664 seconds
$ perl foo2.pl > nul # MCE::Shared->queue
duration: 2.120 seconds
$ perl foo4.pl > nul # Thread::Queue
duration: 2.886 seconds
Results from FreeBSD and Solaris.
# TrueOS 10.0 ( FreeBSD, Perl 5.16.3 )
$ perl foo1.pl | wc -l # Foo::Inbox
duration: 0.910 seconds
50000
$ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability
duration: 0.875 seconds
50000
$ perl foo2.pl | wc -l # MCE::Shared->queue
duration: 1.107 seconds
50000
$ perl foo4.pl | wc -l # Thread::Queue
duration: 0.797 seconds
50000
# Solaris 11.2 ( Perl 5.22.2 )
$ perl foo1.pl | wc -l # Foo::Inbox
duration: 1.319 seconds
50000
$ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability
duration: 1.344 seconds
50000
$ perl foo2.pl | wc -l # MCE::Shared->queue
duration: 1.525 seconds
50000
$ perl foo4.pl | wc -l # Thread::Queue
duration: 1.822 seconds
50000
From this testing, Threads + Thread::Queue runs better on FreeBSD and Solaris compared to others.
Foo::Inbox lacks blocking capability, thus has lower latency. However, it may run much slower when the Inbox isn't busy.
Regards, Mario | [reply] [d/l] [select] |
|
Please know that the results on this post doesn't count. I've overclocked a system on purpose, mainly for simulating a fast machine in the future. On an 8-core box (total 16 logical cores), I ran the same tests to better understand the impact between the three implementations. The box is overclocked to 4.0 GHz. What is interesting to me is not how fast but rather the impact from running many workers.
Baseline testing involves 4 workers and runs about 2x faster than my laptop. No surprises there due to running at 4 GHz and not from a Linux VM.
Update: Added results for MCE::Inbox which will ship with MCE::Shared 1.827 on release day. It is Inbox2 with optimizations.
# 4 GHz, CentOS 7 ( Perl v5.16.3 )
$ perl foo1.pl | wc -l # Foo::Inbox
duration: 0.441 seconds
50000
$ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability
duration: 0.422 seconds
50000
$ perl foo2.pl | wc -l # MCE::Shared->queue
duration: 0.645 seconds
50000
$ perl foo4.pl | wc -l # Thread::Queue
duration: 1.544 seconds
50000
Next is 128 workers retrieving 1.6 million messages.
# 4 GHz, CentOS 7 ( Perl v5.16.3 )
$ perl foo1.pl | wc -l # Foo::Inbox
duration: 12.685 seconds
1600000
$ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability
duration: 15.939 seconds
1600000
$ perl foo2.pl | wc -l # MCE::Shared->queue
duration: 21.533 seconds
1600000
$ perl foo4.pl | wc -l # Thread::Queue
duration: 90.015 seconds
1600000
Unfortunately, threads may not scale linearly, especially when involving locking. In top, I see the process peak at 306% max while running. It's far from the 1600% mark hardware limit. On the other hand, MCE::Inbox scales nicely considering it does blocking as well.
For the 1.6 million test, I added a loop to generate more names, 128 total.
my @names = shuffle qw/ Barny Betty Fred Wilma /;
foreach my $i ( 1 .. 5 ) {
push @names, map { $_.$i } @names;
print scalar(@names), "\n";
}
__END__
8
16
32
64
128
There was one other change and replaced 4167 with 98 because there are now 128 workers, not 4.
...
# send greeting again
$inbox->send($name, \@names, 'Hello') if $count < 98;
# eventually stop benchmarking
last if ++$count == 12500;
...
Regards, Mario
| [reply] [d/l] [select] |
|
|
$ diff hobo_demo.pl threads_demo.pl
5,6c5,6
< use MCE::Hobo;
< use Foo::Inbox2;
---
> use threads;
> use Foo::Inbox4;
10c10
< my $inbox = Foo::Inbox2->new( @names );
---
> my $inbox = Foo::Inbox4->new( @names );
34c34
< MCE::Hobo->create(\&foo, $_) for @names;
---
> threads->create(\&foo, $_) for @names;
44c44
< MCE::Hobo->waitall;
---
> $_->join() for threads->list();
Foo::Inbox2 via MCE::Hobo + MCE::Shared->queue example
Foo::Inbox4 via threads + Thread::Queue example
Regards, Mario
| [reply] [d/l] [select] |
|
|