Beefy Boxes and Bandwidth Generously Provided by pair Networks
Welcome to the Monastery
 
PerlMonks  

Parallel::ForkManager and multiple datasets

by Speed_Freak (Sexton)
on Jul 05, 2018 at 13:13 UTC ( #1217948=perlquestion: print w/replies, xml ) Need Help??

Speed_Freak has asked for the wisdom of the Perl Monks concerning the following question:

I am trying to speed up an existing script by focusing on a loop that runs through individual files and returns values to an array and several hashes. I think this can be done using forks, but I'm not sure how to return the data from the children to the parent.

Reading the documentation it says you can only return one data structure, and to keep it small. Is it possible to return all of the values I've listed in the pseudo code below?

If so(assuming it's going to involve making one data structure containing references to all of these, how would you go about that, and then how would you dereference it once it's back to the parent? (I have struggled immensely with referencing/dereferencing!)

#pseudo code use Parallel::ForkManager; $threads = 20 my $pfm = new Parallel::ForkManager( $threads ); FOREACH: foreach my $thing(keys %{$things_hr}) { my $pid = $pfm->start and next; #do stuff push @names, $name; #do stuff $thing{$name}{a} = $value1; $thing{$name}{b} = $value2; #do stuff $list{$id}{$name}{b} = $b; $list{$id}{$name}{e} = $b1; $list{$id}{$name}{a} = 0; $list{$id}{$name}{c} = $ar->[$id][3]; $list{$id}{$name}{i} = $ar->[$id][1]; $list{$id}{$name}{j} = $ar->[$id][2]; $list{$id}{$name}{k} = sprintf("%.4f",$ar->[$id][9]); $pfm->finish; $pfm->wait_all_children;

Replies are listed 'Best First'.
Re: Parallel::ForkManager and multiple datasets
by bliako (Parson) on Jul 05, 2018 at 23:07 UTC

    In each child process create as complex a data structure as you wish totally contained within the child block. Then, when you are done processing, serialise the data structure using one of many existing serialisers, e.g. Sereal, to serialise the data.

    my $complex_data_structure = {'a'=>[1,2,3], 'b'=>{'c'=>[4,5,6],'d'=>LW +P::UserAgent->new()}}; my $serialised_data = Sereal::Encoder::encode_sereal($complex_data_str +ucture); $pfm->finish(0, \$serialised_data); # <<< note that we pass a referenc +e to our serialised-data.

    The callback run_on_finish() is called every time a child is done processing. There we will de-serialise our data via the $data_structure_reference, as thus:

    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structur +e_reference) = @_; my $data = Sereal::Decoder::decode_sereal($$data_structure_reference); + ## de-referencing the ref to the serialised data and then de-seriali +sing.

    Below is something to get you started. Note a few points: 1) how to get the pid of the child, 2) pass the data back via its reference. But the main point is that you serialise your complex data from child as a, let's say huge zipped string and that is passed on the parent process. I am not sure how well Sereal can handle references to objects created within the child and how well can re-constitute them back in parent.

    #!/usr/bin/env perl use strict; use warnings; use Parallel::ForkManager; use Data::Dump qw/dump/; # bliako use Sereal::Encoder qw(encode_sereal sereal_encode_with_object); use Sereal::Decoder qw(decode_sereal sereal_decode_with_object); my @names = (); my %list = (); my %thing = (); my $threads = 20; my $pfm = new Parallel::ForkManager( $threads ); my %results = (); $pfm->run_on_finish( sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_ +structure_reference) = @_; my $data = Sereal::Decoder::decode_sereal($$data_structure_ref +erence); # surely this is sequential code here so no need to lock %resu +lts, right? $results{$pid} = $data; # using pid as key is not a good idea because maybe a pid numb +er will be eventually recycled. }); my $things_hr = { 'job1' => 'this is job 1 data', 'job2' => 'this is job 2 data', 'job3' => 'this is job 3 data', 'job4' => 'this is job 4 data', 'job5' => 'this is job 5 data', }; THELOOP: foreach my $thing(keys %{$things_hr}) { print "THING = $thing\n"; $pfm->start and next THELOOP; my $pid = $$; my $returned_data = { 'item1' => "item1 from pid $pid, for item $thing and v +alue ".$things_hr->{$thing}, 'item2' => "item2 from pid $pid, for item $thing and v +alue ".$things_hr->{$thing}, "item3 are some array refs for pid: $pid", => [1,2,3,4 +], }; my $serialised_data = Sereal::Encoder::encode_sereal($returned +_data); print "pid=$pid, this is what I am sending:\n".dump($returned_ +data)."\n"; $pfm->finish(0, \$serialised_data); } $pfm->wait_all_children; print "Here are the results:\n".dump(%results)."\n";

    bw, bliako

      Thanks! Working on trying this out now.

      In reading through this, I do see a problem that I'm not entirely sure how to handle. I have a multidimensional hash that is created in the loop. And values are pulled from it later in the script. Will I need to rewrite all the follow on code to accommodate the extra layer of data? ($pid) Or is there a way to "push" each de-serialized chunk into the parent structure without changing the child structure?

      won't this line: $results{$pid} = $data; turn this: $VAR1 = { ‘id_1’ => { 'thing_1' => { 'a' => 1, 'b' => 4.5, 'c' => 1200 } 'thing_2' => { 'a' => 0, 'b' => 3.2, 'c' => 100 } } ‘id_2’ => { 'thing_1' => { 'a' => 1, 'b' => 4.5, 'c' => 1200 } 'thing_2' => { 'a' => 0, 'b' => 3.2, 'c' => 100 } } } Into something much more complex since each child is forked on the lis +t of things, and then loops through a list of 1 million id's.

      The code has a for loop inside a for loop. I am trying to fork it at the main loop. This will generate around 200 child processes. The internal loop then repeats one million times. The data structure is based on the inner loop first, then the outer loop. So there are a million id's, and around 200 things per id, and 6 or so place holders per thing. I'm worried that adding the $pid into the mix in front of the data structure, but for each child process will add a ton of data t the hash?

        I am not sure if I understand correctly what the challenge is. Each child must return back results as a data chunk independent of any other child's. The run_on_finish() sub receives each child's data and, in my example, puts all children's data together in a hash keyed on child pid (see note below about that). Why? because I had assumed that you want to keep separate each child's results, that it is possible that child1 returns data with id=12 and so can child2. If that is not necessary, e.g. if each child returns results which can be added to a larger hash without any of the keys being the same over some children, then fine, it is not set on stone, just merge the children's returned hashes into a larger hash like so:

        my %results = (); $pfm->run_on_finish( sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_stru +cture_reference) = @_; my $data = Sereal::Decoder::decode_sereal($$data_structure_referen +ce); # surely this is sequential code here so no need to lock %results, + right? @results{keys %$data} = values %$data; });

        This will create a "flatter" hash without pid information but there is the risk of key clashes: if %child1 contains key id=12 and %child2 contains key id=12 (at the top level of their hashes), the new hash %results can contain, of course, only 1 value for it and that will be what is in the last child.

        A nested hash is probably more efficient than a flat-out hash of 1 million items. At least as far as possible key collisions are concerned. Other Monks can correct me on that. In general, I would assume that hash with 1 million items is child's play for Perl.

        Note on using PIDs as hash keys: Using children's pid as a hash key to collect each child's results is not a good idea because pid numbers can be re-cycled by the OS and two children at different times may get the same pid. A better idea is to assign each child its own unique id drawn from a pool of unique ids and given to the child at fork just like its input data.

        Let me know if I got something wrong or you have more questions

        bw, bliako

Re: Parallel::ForkManager and multiple datasets
by tybalt89 (Parson) on Jul 09, 2018 at 04:04 UTC
    #!/usr/bin/perl # https://perlmonks.org/?node_id=1217948 use strict; use warnings; use Data::Dump 'dd'; use Storable qw( freeze thaw ); use Time::HiRes qw( time sleep ); use IO::Select; my $maxforks = 20; my @ids = 'job01' .. 'job38'; # it is cool that this works my $sel = IO::Select->new; my $start = time; my @answers; while( @ids or $sel->count ) # unstarted or active { while( @ids and $sel->count < $maxforks ) # start all forks allowed { my $id = shift @ids; if( open my $fh, '-|' ) # forking open { $sel->add( $fh ); # parent } else # child code goes here { sleep rand; print freeze { id => $id, pid => $$, time => time - $start }; exit; } } for my $fh ( $sel->can_read ) # collecting child data { local $/; push @answers, thaw <$fh>; $sel->remove( $fh ); } } dd \@answers;

      Hi tybalt89,

      That looks like a fun demonstration and gave it a spin. I commented out sleep and changed the input to 'job0001' .. 'job9999'. Then tried MCE implementations, mainly to compare overhead. MCE::Loop and MCE::Flow are wantarray-aware and construct a gather option with corresponding gather function automatically.

      chunk_size => 1

      #!/usr/bin/perl # https://perlmonks.org/?node_id=1217948 use strict; use warnings; use Data::Dump 'dd'; use Time::HiRes qw( time sleep ); use MCE::Loop; my $maxforks = 20; my @ids = 'job0001' .. 'job9999'; # it is cool that this works my $start = time; MCE::Loop->init( max_workers => $maxforks, chunk_size => 1 ); my @answers = mce_loop { my $id = $_; # sleep rand; my $ret = { id => $id, pid => $$, time => time - $start }; MCE->gather($ret); } \@ids; MCE::Loop->finish; dd \@answers;

      chunk_size => 'auto'

      #!/usr/bin/perl # https://perlmonks.org/?node_id=1217948 use strict; use warnings; use Data::Dump 'dd'; use Time::HiRes qw( time sleep ); use MCE::Loop; my $maxforks = 20; my @ids = 'job0001' .. 'job9999'; # it is cool that this works my $start = time; MCE::Loop->init( max_workers => $maxforks, chunk_size => 'auto' ); my @answers = mce_loop { my @ret; for my $id ( @{ $_ } ) { # sleep rand; push @ret, { id => $id, pid => $$, time => time - $start }; } MCE->gather(@ret); } \@ids; MCE::Loop->finish; dd \@answers;

      Disclaimer. This is comparing apples to oranges because tybalt89's demonstration involves spawning a worker per each element. For the MCE demonstrations, workers request the manager process the next input element(s).

      On my laptop (macOS and 9,999 iterations), tybalt89's example takes 16 seconds versus 0.6 and 0.5 seconds respectively for the MCE demonstrations.

      Kind regards, Mario

        Hi marioroy,

        As you said, it's apples to oranges, so let's try to be more appleish (or is it orangeish?).

        First attempt was to group job ids so a child does more than one during its lifetime. Turns out to be fairly simple.

        Right after I posted Re: Parallel::ForkManager and multiple datasets, I realized I had written roughly the same forking code several times, so it was time to move it to a module.

        Here's the module. It uses callbacks for the child code and for the parent code that processes the child's returned value.

        package Forking::Amazing; sub run ($&&@) { my ( $maxforks, $childcallback, $resultcallback, @ids ) = @_; use Storable qw( freeze thaw ); use IO::Select; my %fh2id; my $sel = IO::Select->new; while( @ids or $sel->count ) # unstarted or active { while( @ids and $sel->count < $maxforks ) # start all forks allowe +d { my $id = shift @ids; if( open my $fh, '-|' ) # forking open { $sel->add( $fh ); # parent $fh2id{$fh} = $id; } else # child code goes here { print freeze $childcallback->($id); exit; } } for my $fh ( $sel->can_read ) # collecting child data { $sel->remove( $fh ); $resultcallback->($fh2id{$fh}, thaw do { local $/; <$fh> }); } } } 1; __END__ =head1 EXAMPLE program use Forking::Amazing; # small example program use Data::Dump 'dd'; Forking::Amazing::run( 5, # max forks sub { +{id => pop, pid => $$} }, # runs in child sub {dd pop}, # process result of child in pare +nt 'a'..'z'); # ids (one fork for each id) =cut

        The module name may change in the future. :)

        Here's code using that module that does grouping of job ids.
        The id passed to the child is now an anon array of job ids, and a child now returns an anon array of results.

        #!/usr/bin/perl use strict; use warnings;; use Forking::Amazing; use Data::Dump 'dd'; use Time::HiRes qw(time); my $groupsize = 1000; my @rawids = 'job0001' .. 'job9999'; my @ids; push @ids, [ splice @rawids, 0, $groupsize ] while @rawids; my @answers; my $start = time; Forking::Amazing::run 20, sub { [ map +{id => $_, pid => $$, time => time - $start}, @{+shift} + ] }, sub { push @answers, @{+pop} }, @ids; my $end = time - $start; dd \@answers; print "forking time $end\n";

        It's a significant speed up :)

        Note that I moved the dd out of the timing loop, since the dd takes over 1.5 seconds to run on my machine and swamps the forking time.

Re: Parallel::ForkManager and multiple datasets
by Speed_Freak (Sexton) on Jul 05, 2018 at 15:28 UTC

    I'm attempting the following, but so far am not successful:

    #pseudo code use Parallel::ForkManager; my @names = (); my %list = (); my %thing = (); $threads = 20 my $pfm = new Parallel::ForkManager( $threads ); $pfm->run_on_finish( sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_stru +cture_reference) = @_; %list = $data_structure_reference->{%list}; %thing = $data_structure_reference->{%thing}; @names = $data_structure_reference->{@names}; }); FOREACH: foreach my $thing(keys %{$things_hr}) { my $pid = $pfm->start and next; #do stuff push @names, $name; #do stuff $thing{$name}{a} = $value1; $thing{$name}{b} = $value2; #do stuff $list{$id}{$name}{b} = $b; $list{$id}{$name}{e} = $b1; $list{$id}{$name}{a} = 0; $list{$id}{$name}{c} = $ar->[$id][3]; $list{$id}{$name}{i} = $ar->[$id][1]; $list{$id}{$name}{j} = $ar->[$id][2]; $list{$id}{$name}{k} = sprintf("%.4f",$ar->[$id][9]); $pfm->finish( 0, [ \%list, \%thing, \@names ] ); $pfm->wait_all_children;

    I'm getting errors about pseudo hashes being deprecated or not existing.

      I got this working, but only for a single data set.I forgot there was another foreach loop going on in one of the "dostuff" sections which is creating a multi dimensional array.

      The problem becomes, how do I return a multidimensional array in its entirety back to the parent?

      #pseudo code use Parallel::ForkManager; my @names = (); my %list = (); my %thing = (); $threads = 20 my $pfm = new Parallel::ForkManager( $threads ); $pfm->run_on_finish( sub { my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_stru +cture_reference) = @_; my $name = $data_structure_reference->{name}; %list = $data_structure_reference->{data1}; $thing{$name}{a} = $data_structure_reference->{thing1}; $thing{$name}{b} = $data_structure_reference->{thing2}; push @names, $data_structure_reference->{name}; }); }); FOREACH: foreach my $thing(keys %{$things_hr}) { my $pid = $pfm->start and next; #do stuff push @names, $name; #do stuff $thing1{$name}{a} = $value1; $thing{$name}{b} = $value2; #do stuff Foreach $id:{ $list{$id}{$name}{b} = $b; $list{$id}{$name}{e} = $b1; $list{$id}{$name}{a} = 0; $list{$id}{$name}{c} = $ar->[$id][3]; $list{$id}{$name}{i} = $ar->[$id][1]; $list{$id}{$name}{j} = $ar->[$id][2]; $list{$id}{$name}{k} = sprintf("%.4f",$ar->[$id][9]); } $pfm->finish( 0, { data1 => %id, name => $sample_name, thing1 => $value1, thing2 => $value2,}); $pfm->wait_all_children;

        Replace

        %list = $data_structure_reference->{data1};

        with

        %list = %{ $data_structure_reference->{data1} };

        and replace

        $pfm->finish( 0, { data1 => %id,

        with

        $pfm->finish( 0, { data1 => \%id,

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://1217948]
Approved by marto
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others contemplating the Monastery: (3)
As of 2020-02-23 15:02 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    What numbers are you going to focus on primarily in 2020?










    Results (102 votes). Check out past polls.

    Notices?