Beefy Boxes and Bandwidth Generously Provided by pair Networks
"be consistent"
 
PerlMonks  

running a function for different list parallely using fork

by noviceuser (Acolyte)
on Mar 15, 2023 at 09:50 UTC ( [id://11150996]=perlquestion: print w/replies, xml ) Need Help??

noviceuser has asked for the wisdom of the Perl Monks concerning the following question:

I am trying to run a function/subroutine called regression for different lists under @testLists in parallel , but i want to run only 2 lists at a time, also the number of lists in the array @testLists can be any n number. Here is the code so far i have written but it will run all the lists parallely

my @testLists; foreach my $list (@testLists) { my $pid = fork; if ($pid == 0) { $status += regressions($opts, $list); exit; } push(@pids, $pid); } foreach my $pid (@pids) { waitpid($pid, 0); }

Replies are listed 'Best First'.
Re: running a function for different list parallely using fork
by hippo (Bishop) on Mar 15, 2023 at 10:22 UTC

    Have you considered Parallel::ForkManager? This is precisely the sort of problem it is designed to solve.


    🦛

      Exactly this.

      use Parallel::ForkManager qw( ); my @testLists = ...; my $pm = Parallel::ForkManager->new( 2 ); for my $list ( @testLists ) { $pm->start and next; $status += regressions( $opts, $list ); $pm->finish; } $pm->wait_all_children();

      That said, you are changing $status in the child to no effect. Did you mean to exit with that code?

      use Parallel::ForkManager qw( ); my @testLists = ...; my $status = 0; my $pm = Parallel::ForkManager->new( 2 ); $pm->run_on_finish(sub { my ( $pid, $exit_code, $ident ) = @_; $status += $exit_code; }); for my $list ( @testLists ) { $pm->start and next; my $status = regressions( $opts, $list ); $status = 255 if $status > 255; $pm->finish( $status ); } $pm->wait_all_children();

      P::FM also facilitates passing more complex values back to the parent.

      use Parallel::ForkManager qw( ); my @testLists = ...; my $status = 0; my $pm = Parallel::ForkManager->new( 2 ); $pm->run_on_finish(sub { my ( $pid, $exit_code, $ident, $exit_signal, $core_dump, $data ) = +@_; if ( $exit_code || $exit_signal || $core_dump ) { # Handle error. } $status += $data->{ status }; }); for my $list ( @testLists ) { $pm->start and next; my $status = regressions( $opts, $list ); $pm->finish( 0, { status => $status } ); } $pm->wait_all_children();

      Update: Applied marioroy's fix.

        See the Parallel::ForkManager demonstration, here. Something similar can be done with MCE::Child. Subsequently, MCE. Notice the PIDs in the 2nd demonstration where the workers persist.

        MCE::Child

        use strict; use warnings; use MCE::Child qw( ); use Time::HiRes qw( time ); my @testLists = qw( sun moon wind air ); my $opts = 'foo'; my $status = 0; sub regressions { my ($opts, $list) = @_; print "$$: $list\n"; sleep 1; # simulate processing return 1; } my $start = time(); MCE::Child->init( max_workers => 2, on_finish => sub { my ( $pid, $exit_code, $ident, $exit_signal, $core_dump, $data ) + = @_; if ( $exit_code || $exit_signal || $core_dump ) { # Handle error. } $status += $data->{ status }; } ); for my $list ( @testLists ) { MCE::Child->create(sub { my $status = regressions( $opts, $list ); return { status => 1 }; }); } MCE::Child->wait_all(); print "status: $status\n"; printf "duration: %0.3f seconds\n", time() - $start; __END__ 27481: sun 27482: moon 27483: wind 27484: air status: 4 duration: 2.013 seconds

        MCE

        use strict; use warnings; use MCE::Loop qw( ); use Time::HiRes qw( time ); my @testLists = qw( sun moon wind air ); my $opts = 'foo'; my $status = 0; sub regressions { my ($opts, $list) = @_; print "$$: $list\n"; sleep 1; # simulate processing return 1; } my $start = time(); MCE->new( max_workers => 2, chunk_size => 1, input_data => \@testLists, gather => sub { $status += $_[0] }, user_func => sub { my $list = $_; my $status = regressions( $opts, $list ); MCE->gather( $status ); }, )->run(); print "status: $status\n"; printf "duration: %0.3f seconds\n", time() - $start; __END__ 27562: sun 27563: moon 27562: wind 27563: air status: 4 duration: 2.005 seconds

        The "$pm->run_on_finish" method takes 2 additional arguments; signal_code and core_dump. I tried the following variation, based on ikegami's solution.

        Parallel::ForkManager

        use strict; use warnings; use Parallel::ForkManager qw( ); use Time::HiRes qw( time ); my @testLists = qw( sun moon wind air ); my $opts = 'foo'; my $status = 0; sub regressions { my ($opts, $list) = @_; print "$$: $list\n"; sleep 1; # simulate processing return 1; } my $start = time(); my $pm = Parallel::ForkManager->new( 2 ); # P::FM by default, only waits for its own child processes, # which is what we want. # Refer to 'BLOCKING CALLS' in the module documentation. # Let's decrease the sleep period time (default 1.0). $pm->set_waitpid_blocking_sleep(0.25); $pm->run_on_finish(sub { my ( $pid, $exit_code, $ident, $exit_signal, $core_dump, $data ) = +@_; if ( $exit_code || $exit_signal || $core_dump ) { # Handle error. } $status += $data->{ status }; }); for my $list ( @testLists ) { $pm->start and next; my $status = regressions( $opts, $list ); $pm->finish( 0, { status => $status } ); } $pm->wait_all_children(); print "status: $status\n"; printf "duration: %0.3f seconds\n", time() - $start; __END__ 27470: sun 27471: moon 27472: wind 27473: air status: 4 duration: 2.257 seconds

        One thing that is not clear to me, how it will handle the running of regressions in case number of lists in @teslLists is odd (i.e 3 or 5 or 7). because at last, only one list will be left. so will below line will run only last list even though we have passed 2 to run in parallel.

        my $pm = Parallel::ForkManager->new( 2 );

        Thank you, let me try!

Re: running a function for different list parallely using fork
by tybalt89 (Monsignor) on Mar 15, 2023 at 10:51 UTC
    #!/usr/bin/perl use strict; # https://perlmonks.org/?node_id=11150996 use warnings; my @testLists = 1 .. 7; # FAKE for testing my $opts = 'opts'; # FAKE for testing my %pids; # NOTE changed to hash because children finish in any order foreach my $list (@testLists) { while( keys %pids >= 2 ) { delete $pids{ +wait }; } if( my $pid = fork ) { $pids{ $pid }++; } elsif( defined $pid ) { my $status += regressions($opts, $list); exit; } else { die "fork failed $!"; } } foreach my $pid ( keys %pids ) { waitpid($pid, 0); } sub regressions # FIXME for testing { print time, " pid $$ starting on (@_)\n"; sleep 1 + int rand 3; print time, " pid $$ ending on (@_)\n"; }

    Sample Output:

    1678877305 pid 65651 starting on (opts 1) 1678877305 pid 65652 starting on (opts 2) 1678877307 pid 65651 ending on (opts 1) 1678877307 pid 65653 starting on (opts 3) 1678877308 pid 65652 ending on (opts 2) 1678877308 pid 65653 ending on (opts 3) 1678877308 pid 65654 starting on (opts 4) 1678877308 pid 65655 starting on (opts 5) 1678877310 pid 65655 ending on (opts 5) 1678877310 pid 65656 starting on (opts 6) 1678877311 pid 65654 ending on (opts 4) 1678877311 pid 65657 starting on (opts 7) 1678877312 pid 65656 ending on (opts 6) 1678877313 pid 65657 ending on (opts 7)

      Tybalt89's solution is neat. Here is a variation using MCE::Channel where the child sends the status to the manager process. The manager does a non-blocking read in the event the child crashed or exited early; e.g. not send the status.

      MCE::Channel

      #!/usr/bin/perl use strict; # https://perlmonks.org/?node_id=11150996 use warnings; use MCE::Channel; my @testLists = 1 .. 7; # FAKE for testing my $opts = 'opts'; # FAKE for testing my %pids; # NOTE changed to hash because children finish in any order my $status = 0; my $chnl = MCE::Channel->new(); foreach my $list (@testLists) { while( keys %pids >= 2 ) { delete $pids{ +wait }; my $val = $chnl->recv_nb(); $status += $val if defined($val); } if( my $pid = fork ) { $pids{ $pid }++; } elsif( defined $pid ) { my $status = regressions($opts, $list); $chnl->send($status); exit; } else { die "fork failed $!"; } } foreach my $pid ( keys %pids ) { waitpid($pid, 0); my $val = $chnl->recv_nb(); $status += $val if defined($val); } print "status: $status\n"; sub regressions # FIXME for testing { print time, " pid $$ starting on (@_)\n"; sleep 1 + int rand 3; print time, " pid $$ ending on (@_)\n"; return 1; } __END__ 1678939864 pid 27850 starting on (opts 1) 1678939864 pid 27851 starting on (opts 2) 1678939866 pid 27851 ending on (opts 2) 1678939866 pid 27852 starting on (opts 3) 1678939867 pid 27850 ending on (opts 1) 1678939867 pid 27853 starting on (opts 4) 1678939868 pid 27853 ending on (opts 4) 1678939868 pid 27854 starting on (opts 5) 1678939869 pid 27852 ending on (opts 3) 1678939869 pid 27855 starting on (opts 6) 1678939871 pid 27855 ending on (opts 6) 1678939871 pid 27854 ending on (opts 5) 1678939871 pid 27856 starting on (opts 7) 1678939872 pid 27856 ending on (opts 7) status: 7

      And here's the Forking::Amazing version. See Re: figuring out Parallel::ForkManager slots for the Forking::Amazing module, which I wrote because I didn't particularly care for Parallel::ForkManager. I think Forking::Amazing makes it clearer what runs in the child and what runs in the parent, and makes it easier to pass results from the child back to the parent.

      #!/usr/bin/perl use strict; # https://perlmonks.org/?node_id=11150996 use warnings; use Forking::Amazing; my @testLists = 1 .. 10; # FAKE for testing my $opts = 'opts'; # FAKE for testing my $status = 0; Forking::Amazing::run 2, # max forks sub { return [ regressions($opts, @_) ]; }, # child sub { $status += $_[1][0]; }, # parent @testLists; # argument for each fork print "status $status\n"; sub regressions # FIXME for testing { print time, " pid $$ starting on (@_)\n"; select undef, undef, undef, 1 + rand 3; print time, " pid $$ ending on (@_)\n"; return pop; }

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://11150996]
Approved by marto
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others perusing the Monastery: (5)
As of 2024-04-25 13:17 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found