Microcebus has asked for the wisdom of the Perl Monks concerning the following question:
Dear wise monks,
sorry if that is a stupid question but I'm a biologist rather than a coder... I want to analyze some biological data. Therefore, I need to read and analyze 4 large files before I can actually analyze my own data. To save time I would like to read the 4 files in parallel using Parallel::Forkmanager. My problem is that I have no idea (even after google search) how to return the data back to the parent. Each subroutine generates a lot of different data structures such as hashes and arrays that I need later on in the parent process. Below is the code that I currently try.
use Parallel::ForkManager;
$threads=4;
if($threads==1)
{
read_genome();
read_mapfile();
read_GTF();
read_RM();
}
else
{
@eval_code=("read_genome();","read_mapfile();","read_GTF();","read
+_RM();");
my$pm=new Parallel::ForkManager($threads);
foreach$eval_code(@eval_code)
{
$pm->start and next;
eval$eval_code;
$pm->finish;
}
$pm->wait_all_children;
}
sub read_genome
{
# do something
}
sub read_mapfile;
{
# do something
}
sub read_GTF
{
# do something
}
sub read_RM
{
# do something
}
# use data generated in the subroutines
Re: Return all the data from child to parent with Parallel::Forkmanager
by Corion (Patriarch) on Aug 18, 2017 at 12:28 UTC
|
fork (and in turn, Parallel::ForkManager) doesn't return data to the parent process and has no provisions to do so.
The easiest approach would be to have each client write out the results to a file, and then read each file in turn in the parent. The run_on_finish callback allows you to conveniently know when to collect more data in the parent:
use JSON::XS 'encode_json', 'decode_json';
$pm->run_on_finish(sub {
my ($pid, $exit_code, $filename) = @_;
print "** $ident was just written";
open my $fh, '<', $filename or die "Couldn't read child results
+from '$filename': $!";
binmode $fh;
local $/;
my $results = <$fh>;
# Convert results back to the data structure:
$results = json_decode( $results );
# now, merge those results with the other results you already ha
+ve
});
...
for my $task (@tasks) {
my $filename = "some filename based on $task";
$pm->start( $filename ) and next;
... do the work
open my $fh, '>', $filename
or die "Couldn't write output file '$filename': $!";
binmode $fh;
print $fh json_encode(\%results);
};
| [reply] [d/l] [select] |
|
fork (and in turn, Parallel::ForkManager) doesn't return data to the parent process and has no provisions to do so.
Nearly right. fork doesn't, but recent versions of Parallel::ForkManager (starting at 0.7.6, released 2010-Aug-15) do allow returning a single data structure, that must be a reference to a string, a hash, or an array. See "RETRIEVING DATASTRUCTURES from child processes" in the documentation of Parallel::ForkManager. What happens there is quite similar to your JSON approach: Parallel::ForkManager serialises the data in the child process using Storable, writes it to disk, and reads it back in the parent, all nearly transparent to the user.
Alexander
--
Today I will gladly share my knowledge and experience, for there are no sweeter words than "I told you so". ;-)
| [reply] |
Re: Return all the data from child to parent with Parallel::Forkmanager
by QM (Parson) on Aug 18, 2017 at 13:18 UTC
|
While your question of child processes sending data to parents is perfectly reasonable, I would challenge the assumption that reading 4 large files in parallel will be any faster than reading them sequentially, unless they are on different disks in the same system.
Perl's IO is well-optimized, and I've generally found that the disk throughput is the bottleneck.
On another tack, if there are 4 different file types, and a different script is needed for each, then you may find some benefit in reading/reducing/writing out to interim files, which can be read by the final script. But the interim files should probably be 100 to 1000 times smaller than the originals. You will find network bandwidth the limiting factor unless the interim files are much smaller than the originals. There are several ways to serialize data that can be restored to native Perl data structures.
Perl Maven has a nice list of serializers to get you started.
Finally, DBM::Deep will store a Perl data structure on disk, as if it were in memory, but the access speed is much slower. I used this for an in-memory data structure that was larger than the virtual memory, and it worked very well.
-QM
--
Quantum Mechanics: The dreams stuff is made of
| [reply] |
Re: Return all the data from child to parent with Parallel::Forkmanager
by tybalt89 (Monsignor) on Aug 18, 2017 at 17:40 UTC
|
Here's a way that avoids (for no particular good reason ) writing disk files by
using perl's forked open. It also avoids eval (for maybe slightly better reasons ).
#!/usr/bin/perl
# http://perlmonks.org/?node_id=1197603
use strict;
use warnings;
use Data::Dumper;
use Storable qw( freeze thaw );
use IO::Select;
my $sel = IO::Select->new;
my %returndata;
# this way you don't need to do an eval
my @subs = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM);
for my $sub (@subs) # start all forks
{
if(open my $fh, '-|')
{
$sel->add($fh);
$returndata{$fh} = '';
}
else # child
{
print freeze $sub->();
exit;
}
}
while( $sel->count ) # get return data
{
for my $fh ( $sel->can_read )
{
if( 0 >= sysread $fh, $returndata{$fh},
16 * 1024, length $returndata{$fh} )
{
my $answer = thaw delete $returndata{$fh};
$sel->remove($fh);
print Dumper $answer; # or whatever you want to do with it
}
}
}
sub read_genome
{
# do something
select undef, undef, undef, .1 + rand 1; # simulate processing tim
+e
return { from => 'read_genome', results => { 1..4} };
}
sub read_mapfile
{
# do something
select undef, undef, undef, .1 + rand 1; # simulate processing tim
+e
return { from => 'read_mapfile', results => { 5..8} };
}
sub read_GTF
{
# do something
select undef, undef, undef, .1 + rand 1; # simulate processing tim
+e
return { from => 'read_GTF', results => { 1..10} };
}
sub read_RM
{
# do something
select undef, undef, undef, .1 + rand 1; # simulate processing tim
+e
return { from => 'read_RM', results => { 2..5} };
}
Prints (in a different order each time it's run)
$VAR1 = {
'results' => {
'2' => 3,
'4' => 5
},
'from' => 'read_RM'
};
$VAR1 = {
'results' => {
'5' => 6,
'1' => 2,
'9' => 10,
'7' => 8,
'3' => 4
},
'from' => 'read_GTF'
};
$VAR1 = {
'from' => 'read_genome',
'results' => {
'3' => 4,
'1' => 2
}
};
$VAR1 = {
'from' => 'read_mapfile',
'results' => {
'7' => 8,
'5' => 6
}
};
| [reply] [d/l] [select] |
|
#!/usr/bin/perl
use strict;
use warnings;
use feature 'say';
use Storable qw( freeze thaw );
use IO::Select;
my $threads = 4;
my $sel = IO::Select->new;
my %seldata;
my @name = ('read_genome', 'read_mapfile', 'read_GTF', 'read_RM');
my @task = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM);
my %ret;
if ($threads == 1) {
for my $id (0 .. $#task) {
$ret{$id} = {};
$ret{$id}{result} = eval { $task[$id]->() };
$ret{$id}{error} = $@;
}
}
else {
# start all forks
for my $id (0 .. $#task) {
if (open my $fh, '-|') {
$sel->add($fh); # parent
$seldata{$fh} = '';
}
else {
my $data = {}; # child
$data->{id} = $id;
$data->{result} = eval { $task[$id]->() };
$data->{error} = $@;
print freeze $data;
exit;
}
}
# acquire data
while ( $sel->count ) {
for my $fh ( $sel->can_read ) {
if ( 0 >= sysread $fh, $seldata{$fh},
16 * 1024, length $seldata{$fh} ) {
my $answer = thaw delete $seldata{$fh};
$sel->remove($fh);
$ret{ $answer->{id} } = {
result => delete $answer->{result},
error => delete $answer->{error}
};
}
}
}
}
sub read_genome {
# do something
return { 'aa' => 'bb' };
}
sub read_mapfile {
# do something
return { 'cc' => 'dd' };
}
sub read_GTF {
# do something
die 'exception';
return { 'ee' => 'ff' }; # not reached
}
sub read_RM {
# do something
return { 'gg' => 'hh' };
}
# use data generated in the subroutines
use Data::Dumper;
for my $id (0 .. $#task) {
say "## ", $name[$id];
if (length $ret{$id}{error}) {
say "ERROR: ", $ret{$id}{error};
next;
}
say Dumper($ret{$id}{result});
}
Output:
## read_genome
$VAR1 = {
'aa' => 'bb'
};
## read_mapfile
$VAR1 = {
'cc' => 'dd'
};
## read_GTF
ERROR: exception at j0.pl line 71.
## read_RM
$VAR1 = {
'gg' => 'hh'
};
Regards, Mario | [reply] [d/l] [select] |
Re: Return all the data from child to parent with Parallel::Forkmanager
by Anonymous Monk on Aug 18, 2017 at 13:48 UTC
|
@eval_code=("read_genome();","read_mapfile();","read_GTF();","read
+_RM();");
foreach$eval_code(@eval_code) {
eval$eval_code;
}
Yikes! Since other monks have answered your actual question, I'm just going to point out that there's a better way to do this:
my @subs = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM);
foreach my $sub (@subs) {
&$sub();
}
| [reply] [d/l] [select] |
|
my $ok = eval {
read_genome();
read_mapfile();
read_GTF();
read_RM();
1;
};
if( ! $ok ) {
warn "Got error while reading: $@";
};
| [reply] [d/l] |
|
... because the whole purpose of this exercise was to try to read the files in parallel?
| [reply] |
|
Re: Return all the data from child to parent with Parallel::Forkmanager
by marioroy (Prior) on Aug 19, 2017 at 04:01 UTC
|
Hi Microcebus,
You were close. Notice the arguments given to $pm->start and $pm->finish. Start accepts an optional identifier for the process. That same identifier is used as the 3rd argument for the on_finish callback. In the demonstration that follows, I puposely have read_GTF die with an exception. The output is the same whether threads equals 1 or 4.
Like afoken mentioned here, Parallel::ForkManager must be 0.7.6 or later for the demonstration to run.
use strict;
use warnings;
use feature 'say';
use Parallel::ForkManager;
my $threads = 4;
my @name = ('read_genome', 'read_mapfile', 'read_GTF', 'read_RM');
my @task = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM);
my (@ret, @err);
if ($threads == 1) {
for my $id (0 .. $#task) {
$ret[$id] = eval { $task[$id]->() };
$err[$id] = $@;
}
}
else {
my $pm = new Parallel::ForkManager($threads);
$pm->set_waitpid_blocking_sleep(0);
$pm->run_on_finish( sub {
my ($pid, $exit, $id, $signal, $core, $data) = @_;
$ret[$id] = delete $data->{ret};
$err[$id] = delete $data->{err};
});
for my $id (0 .. $#task) {
$pm->start($id) and next;
my $res = eval { $task[$id]->() };
$pm->finish(0, { ret => $res, err => $@ });
}
$pm->wait_all_children;
}
sub read_genome {
# do something
return { 'aa' => 'bb' };
}
sub read_mapfile {
# do something
return { 'cc' => 'dd' };
}
sub read_GTF {
# do something
die 'exception';
return { 'ee' => 'ff' }; # not reached
}
sub read_RM {
# do something
return { 'gg' => 'hh' };
}
# use data generated in the subroutines
use Data::Dumper;
for my $id (0 .. $#task) {
say "## ", $name[$id];
if (length $err[$id]) {
say "ERROR: ", $err[$id];
next;
}
say Dumper($ret[$id]);
}
Output:
## read_genome
$VAR1 = {
'aa' => 'bb'
};
## read_mapfile
$VAR1 = {
'cc' => 'dd'
};
## read_GTF
ERROR: exception at j1.pl line 54.
## read_RM
$VAR1 = {
'gg' => 'hh'
};
Regards, Mario | [reply] [d/l] [select] |
|
Hi again,
The following provides a MCE::Loop demonstration. This time, am using a hash for storing the returned data. Workers pass data using socket handles inside MCE. Thus, no temp files are made for sending data to the parent process.
use strict;
use warnings;
use feature 'say';
use MCE::Loop;
my $threads = 4;
my @name = ('read_genome', 'read_mapfile', 'read_GTF', 'read_RM');
my @task = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM);
my %ret;
if ($threads == 1) {
for my $id (0 .. $#task) {
$ret{"$id"} = eval { $task[$id]->() };
$ret{"$id:err"} = $@;
}
}
else {
MCE::Loop->init(
max_workers => $threads,
posix_exit => 1,
chunk_size => 1
);
%ret = mce_loop {
my $id = $_;
my $res = eval { $task[$id]->() };
MCE->gather( $id => $res, "$id:err" => $@ );
}
[ 0 .. $#task ];
MCE::Loop->finish;
}
sub read_genome {
# do something
return { 'aa' => 'bb' };
}
sub read_mapfile {
# do something
return { 'cc' => 'dd' };
}
sub read_GTF {
# do something
die 'exception';
return { 'ee' => 'ff' }; # not reached
}
sub read_RM {
# do something
return { 'gg' => 'hh' };
}
# use data generated in the subroutines
use Data::Dumper;
for my $id (0 .. $#task) {
say "## ", $name[$id];
if (length $ret{"$id:err"}) {
say "ERROR: ", $ret{"$id:err"};
next;
}
say Dumper($ret{$id});
}
Output:
## read_genome
$VAR1 = {
'aa' => 'bb'
};
## read_mapfile
$VAR1 = {
'cc' => 'dd'
};
## read_GTF
ERROR: exception at j2.pl line 45, <__ANONIO__> line 2.
## read_RM
$VAR1 = {
'gg' => 'hh'
};
Regards, Mario | [reply] [d/l] [select] |
|
|