noviceuser has asked for the wisdom of the Perl Monks concerning the following question:
I am trying to run a function/subroutine called regression for different lists under @testLists in parallel , but i want to run only 2 lists at a time, also the number of lists in the array @testLists can be any n number. Here is the code so far i have written but it will run all the lists parallely
my @testLists;
foreach my $list (@testLists) {
my $pid = fork;
if ($pid == 0) {
$status += regressions($opts, $list);
exit;
}
push(@pids, $pid);
}
foreach my $pid (@pids) {
waitpid($pid, 0);
}
Re: running a function for different list parallely using fork
by hippo (Bishop) on Mar 15, 2023 at 10:22 UTC
|
Have you considered Parallel::ForkManager? This is precisely the sort of problem it is designed to solve.
| [reply] |
|
use Parallel::ForkManager qw( );
my @testLists = ...;
my $pm = Parallel::ForkManager->new( 2 );
for my $list ( @testLists ) {
$pm->start and next;
$status += regressions( $opts, $list );
$pm->finish;
}
$pm->wait_all_children();
That said, you are changing $status in the child to no effect. Did you mean to exit with that code?
use Parallel::ForkManager qw( );
my @testLists = ...;
my $status = 0;
my $pm = Parallel::ForkManager->new( 2 );
$pm->run_on_finish(sub {
my ( $pid, $exit_code, $ident ) = @_;
$status += $exit_code;
});
for my $list ( @testLists ) {
$pm->start and next;
my $status = regressions( $opts, $list );
$status = 255 if $status > 255;
$pm->finish( $status );
}
$pm->wait_all_children();
P::FM also facilitates passing more complex values back to the parent.
use Parallel::ForkManager qw( );
my @testLists = ...;
my $status = 0;
my $pm = Parallel::ForkManager->new( 2 );
$pm->run_on_finish(sub {
my ( $pid, $exit_code, $ident, $exit_signal, $core_dump, $data ) =
+@_;
if ( $exit_code || $exit_signal || $core_dump ) {
# Handle error.
}
$status += $data->{ status };
});
for my $list ( @testLists ) {
$pm->start and next;
my $status = regressions( $opts, $list );
$pm->finish( 0, { status => $status } );
}
$pm->wait_all_children();
Update: Applied marioroy's fix.
| [reply] [d/l] [select] |
|
use strict;
use warnings;
use MCE::Child qw( );
use Time::HiRes qw( time );
my @testLists = qw( sun moon wind air );
my $opts = 'foo';
my $status = 0;
sub regressions {
my ($opts, $list) = @_;
print "$$: $list\n";
sleep 1; # simulate processing
return 1;
}
my $start = time();
MCE::Child->init(
max_workers => 2,
on_finish => sub {
my ( $pid, $exit_code, $ident, $exit_signal, $core_dump, $data )
+ = @_;
if ( $exit_code || $exit_signal || $core_dump ) {
# Handle error.
}
$status += $data->{ status };
}
);
for my $list ( @testLists ) {
MCE::Child->create(sub {
my $status = regressions( $opts, $list );
return { status => 1 };
});
}
MCE::Child->wait_all();
print "status: $status\n";
printf "duration: %0.3f seconds\n", time() - $start;
__END__
27481: sun
27482: moon
27483: wind
27484: air
status: 4
duration: 2.013 seconds
MCE
use strict;
use warnings;
use MCE::Loop qw( );
use Time::HiRes qw( time );
my @testLists = qw( sun moon wind air );
my $opts = 'foo';
my $status = 0;
sub regressions {
my ($opts, $list) = @_;
print "$$: $list\n";
sleep 1; # simulate processing
return 1;
}
my $start = time();
MCE->new(
max_workers => 2,
chunk_size => 1,
input_data => \@testLists,
gather => sub { $status += $_[0] },
user_func => sub {
my $list = $_;
my $status = regressions( $opts, $list );
MCE->gather( $status );
},
)->run();
print "status: $status\n";
printf "duration: %0.3f seconds\n", time() - $start;
__END__
27562: sun
27563: moon
27562: wind
27563: air
status: 4
duration: 2.005 seconds
| [reply] [d/l] [select] |
|
|
use strict;
use warnings;
use Parallel::ForkManager qw( );
use Time::HiRes qw( time );
my @testLists = qw( sun moon wind air );
my $opts = 'foo';
my $status = 0;
sub regressions {
my ($opts, $list) = @_;
print "$$: $list\n";
sleep 1; # simulate processing
return 1;
}
my $start = time();
my $pm = Parallel::ForkManager->new( 2 );
# P::FM by default, only waits for its own child processes,
# which is what we want.
# Refer to 'BLOCKING CALLS' in the module documentation.
# Let's decrease the sleep period time (default 1.0).
$pm->set_waitpid_blocking_sleep(0.25);
$pm->run_on_finish(sub {
my ( $pid, $exit_code, $ident, $exit_signal, $core_dump, $data ) =
+@_;
if ( $exit_code || $exit_signal || $core_dump ) {
# Handle error.
}
$status += $data->{ status };
});
for my $list ( @testLists ) {
$pm->start and next;
my $status = regressions( $opts, $list );
$pm->finish( 0, { status => $status } );
}
$pm->wait_all_children();
print "status: $status\n";
printf "duration: %0.3f seconds\n", time() - $start;
__END__
27470: sun
27471: moon
27472: wind
27473: air
status: 4
duration: 2.257 seconds
| [reply] [d/l] |
|
Re: running a function for different list parallely using fork
by tybalt89 (Monsignor) on Mar 15, 2023 at 10:51 UTC
|
#!/usr/bin/perl
use strict; # https://perlmonks.org/?node_id=11150996
use warnings;
my @testLists = 1 .. 7; # FAKE for testing
my $opts = 'opts'; # FAKE for testing
my %pids; # NOTE changed to hash because children finish in any order
foreach my $list (@testLists)
{
while( keys %pids >= 2 )
{
delete $pids{ +wait };
}
if( my $pid = fork )
{
$pids{ $pid }++;
}
elsif( defined $pid )
{
my $status += regressions($opts, $list);
exit;
}
else
{
die "fork failed $!";
}
}
foreach my $pid ( keys %pids )
{
waitpid($pid, 0);
}
sub regressions # FIXME for testing
{
print time, " pid $$ starting on (@_)\n";
sleep 1 + int rand 3;
print time, " pid $$ ending on (@_)\n";
}
Sample Output:
1678877305 pid 65651 starting on (opts 1)
1678877305 pid 65652 starting on (opts 2)
1678877307 pid 65651 ending on (opts 1)
1678877307 pid 65653 starting on (opts 3)
1678877308 pid 65652 ending on (opts 2)
1678877308 pid 65653 ending on (opts 3)
1678877308 pid 65654 starting on (opts 4)
1678877308 pid 65655 starting on (opts 5)
1678877310 pid 65655 ending on (opts 5)
1678877310 pid 65656 starting on (opts 6)
1678877311 pid 65654 ending on (opts 4)
1678877311 pid 65657 starting on (opts 7)
1678877312 pid 65656 ending on (opts 6)
1678877313 pid 65657 ending on (opts 7)
| [reply] [d/l] [select] |
|
#!/usr/bin/perl
use strict; # https://perlmonks.org/?node_id=11150996
use warnings;
use MCE::Channel;
my @testLists = 1 .. 7; # FAKE for testing
my $opts = 'opts'; # FAKE for testing
my %pids; # NOTE changed to hash because children finish in any order
my $status = 0;
my $chnl = MCE::Channel->new();
foreach my $list (@testLists)
{
while( keys %pids >= 2 )
{
delete $pids{ +wait };
my $val = $chnl->recv_nb();
$status += $val if defined($val);
}
if( my $pid = fork )
{
$pids{ $pid }++;
}
elsif( defined $pid )
{
my $status = regressions($opts, $list);
$chnl->send($status);
exit;
}
else
{
die "fork failed $!";
}
}
foreach my $pid ( keys %pids )
{
waitpid($pid, 0);
my $val = $chnl->recv_nb();
$status += $val if defined($val);
}
print "status: $status\n";
sub regressions # FIXME for testing
{
print time, " pid $$ starting on (@_)\n";
sleep 1 + int rand 3;
print time, " pid $$ ending on (@_)\n";
return 1;
}
__END__
1678939864 pid 27850 starting on (opts 1)
1678939864 pid 27851 starting on (opts 2)
1678939866 pid 27851 ending on (opts 2)
1678939866 pid 27852 starting on (opts 3)
1678939867 pid 27850 ending on (opts 1)
1678939867 pid 27853 starting on (opts 4)
1678939868 pid 27853 ending on (opts 4)
1678939868 pid 27854 starting on (opts 5)
1678939869 pid 27852 ending on (opts 3)
1678939869 pid 27855 starting on (opts 6)
1678939871 pid 27855 ending on (opts 6)
1678939871 pid 27854 ending on (opts 5)
1678939871 pid 27856 starting on (opts 7)
1678939872 pid 27856 ending on (opts 7)
status: 7
| [reply] [d/l] |
|
And here's the Forking::Amazing version. See Re: figuring out Parallel::ForkManager slots for the Forking::Amazing module, which I wrote
because I didn't particularly care for Parallel::ForkManager. I think Forking::Amazing makes it clearer what runs in the child and
what runs in the parent, and makes it easier to pass results from the child back to the parent.
#!/usr/bin/perl
use strict; # https://perlmonks.org/?node_id=11150996
use warnings;
use Forking::Amazing;
my @testLists = 1 .. 10; # FAKE for testing
my $opts = 'opts'; # FAKE for testing
my $status = 0;
Forking::Amazing::run
2, # max forks
sub { return [ regressions($opts, @_) ]; }, # child
sub { $status += $_[1][0]; }, # parent
@testLists; # argument for each fork
print "status $status\n";
sub regressions # FIXME for testing
{
print time, " pid $$ starting on (@_)\n";
select undef, undef, undef, 1 + rand 3;
print time, " pid $$ ending on (@_)\n";
return pop;
}
| [reply] [d/l] |
|
|