http://www.perlmonks.org?node_id=580608

This is a followup to

Using functional programming to reduce the pain of parallel-execution programming (with threads, forks, or name your poison)

In this post, I was attempting to create numerous variations of a function builder I'm calling hashmap_parallel.

This function builder should theoretically allow me to implement, for example, many of the utilities in List::MoreUtils in a transparently parallel way. Eg, instead of using flexygrep to build up a parallel grepping function on top of a parallel mapping function and an element test, as shown below, I would use flexy_any, flexy_all, flexy_pairwise, flexy_mesh, or what have you. These could all build on top of diverse custom hashmap_parallel, which you could plug in at will depending on your resources and the paricular bottleneck for the computation you are faced with.

So, if you have multiple processors, you can take advantage of that without cluttering up your code. And if you happen to have a 1000 commodity-box cluster, perhaps you too will one day be able to sort a 10^10 element array in under two minutes, with a simple call to distributed_sort($cmp_function, $my_big_list)

Since posting the above meditation, I have been able to implement hashmap_parallel in two ways.

1) threads (with help from LanceDeeply)
2) forks, along with multiple DBM::Deep data stores (using Parallel::ForkManager on tilly's advice, using DBM::Deep because... well, I couldn't think of any other way to do it that would work for any type of data.)

In addition, I have a simple "serial hashmap" function that I can plug into my "flexygrep" grep builder, as a sanity check.

At present, the threading solution is faster. I'm attributing this to the multiple hard drive read/writes that are required with the multiple DBM::Deep solution.

I was hoping to also implement a solution using a single DBM::Deep db and locking, which should at least reduce the read time. However, I was unable to do so. My test is to match a collection of letters against /[abc]/, where this test is made after sleeping for one second (we delay so that there is an artificial "reason" to want to take advantage of parallelism). There should be 9 matches in each case.

perl test_hashmap.pl test strings: a b c a b c a b c ok 1 - threads, matches: 9 -- a a b c c b b a c -- should be 9 ok 2 - threads, time elapsed: 2, wanted max seconds: 2 ok 3 - fork with many dbm::deep dbs, matches: 9 -- a a b c c b b a c - +- should be 9 ok 4 - fork with many dbm::deep dbs, time elapsed: 1, wanted max secon +ds: 2 Use of uninitialized value in join or string at test_hashmap.pl line 4 +8. not ok 5 - fork with 1 dbm::deep db, matches: 7 -- a b b b c c -- sho +uld be 9 # Failed test 'fork with 1 dbm::deep db, matches: 7 -- a b b b c c +-- should be 9' # in test_hashmap.pl at line 48. ok 6 - fork with 1 dbm::deep db, time elapsed: 1, wanted max seconds: +2 test strings: a b c d e f g h i j k l m n o p q r s t u v w x y z a b +c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k + l m n o p q r s t u v w x y z ok 7 - threads, matches: 9 -- a b a c c b c b a -- should be 9 ok 8 - threads, time elapsed: 2, wanted max seconds: 2 ok 9 - fork with many dbm::deep dbs, matches: 9 -- a b a c c b c b a - +- should be 9 not ok 10 - fork with many dbm::deep dbs, time elapsed: 9, wanted max +seconds: 2 # Failed test 'fork with many dbm::deep dbs, time elapsed: 9, wanted + max seconds: 2' # in test_hashmap.pl at line 49. not ok 11 - fork with 1 dbm::deep db, matches: 8 -- b c a c b c a b -- + should be 9 # Failed test 'fork with 1 dbm::deep db, matches: 8 -- b c a c b c a + b -- should be 9' # in test_hashmap.pl at line 48. not ok 12 - fork with 1 dbm::deep db, time elapsed: 9, wanted max seco +nds: 2 # Failed test 'fork with 1 dbm::deep db, time elapsed: 9, wanted max + seconds: 2' # in test_hashmap.pl at line 49. 1..12 # Looks like you failed 4 tests of 12.

In addition to the test failures for the single DBM::Deep file shown above, about 25% of the time the single DBM::Deep file case seems to lock up, and the test never completes.

I would be very interested in receiving advice for how to do hashmap_paralell with a single DBM::Deep. And if there are suggestions for other strategies I could try out, or changes to my function builders that would speed things up, I'll be much obliged.

Thanks.

test_hashmap.pl: #!/usr/bin/perl use strict; use warnings; use Test::More qw( no_plan ); use Data::Dumper; use Grep; # several different versions of grep, built up functionally. my $slow_matches_b = sub { sleep 1; return unless $_[0]; return 1 if $_[0] =~ /[abc]/; }; my $justafew_letters = [ my @letters = (('a'..'c') x 3) ]; my $lotsa_letters = [ (('a'..'z') x 3) ]; for ( $justafew_letters, $lotsa_letters ){ testem($_) } sub testem { my $test_strings = shift or die "no test strings"; print "test strings: @$test_strings\n"; my $paralell_tests = [ { testname => 'threads', function => sub { Grep::threadgrep( $_[0], $_[1]) } }, {testname=> 'fork with many dbm::deep dbs', function => sub { Grep::fork_manydbs_grep($_[0], $_[1]) } }, {testname => 'fork with 1 dbm::deep db', function => sub { Grep::fork_onedb_grep($_[0], $_[1]) } }, #{testname => 'simple serial map', #function => sub { Grep::slowgrep($_[0], $_[1]) } #} ]; my $max_seconds_parallel=2; # parallel execution should speed things + up for (@$paralell_tests ) { my $timestarted=time; my $test_name = $_->{testname}; my $matches = $_->{function}->($slow_matches_b, $test_strings); my $timeelapsed=time - $timestarted; my $num_matches = @$matches; ok( $num_matches == 9, "$test_name, matches: $num_matches -- @$mat +ches -- should be 9"); ok( $timeelapsed <= $max_seconds_parallel, "$test_name, time elaps +ed: $timeelapsed, wanted max seconds: $max_seconds_parallel"); } } #my ($timestarted, $timeelapsed); Grep.pm: package Grep; use strict; use warnings; use Data::Dumper; use Map; # grep can be parallelized by building it on top of map_parallel # which uses forks, threads, distributed computations with MapReduce # or some such black magic # in some cases this may be faster, but not always, # it depends where your bottleneck is. # Whatever black magic is going on in the background, # by abstracting it out, the code we get is clean and easy to read. sub threadgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_threads(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub fork_onedb_grep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks_onedb(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub fork_manydbs_grep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks_manydbs(@_)}; return flexygrep($test_function, $map_function, $in_array); } # or you could do it in a non-forked/threaded/distributed/whatever # way, by basing it on the conceptually simpler function map_serial. sub slowgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_serialized(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub flexygrep { my $test_function = shift; my $hashmap_function = shift; my $in_array = shift; my $in_hash = Map::hash_from_array($in_array); my $result_hash = $hashmap_function->($test_function, $in_hash); my $out_array = []; for my $key (keys %$result_hash) { if ( my $out_true = $result_hash->{$key}->{out} ) { push @$out_array, $result_hash->{$key}->{in} } } return $out_array; } 1; Map.pm: package Map; use strict; use warnings; # Black magic for doing stuff in parallel is encapsulated here # use MapReduce; -- not yet, but it's on the list. use Data::Dumper; sub hash_from_array { my $array = shift; my $hash; for my $index (0..$#$array) { $hash->{$index}->{in} = $array->[$index]; } return $hash; } # input is a funcion (eg, my $sub_multiply by ten = { return $_[0] * 1 +0 } ), and # a hash like # my $input_values = { blee => { in => 1 }, # blah => { in => 2} # } # output is a hash like #{ blee => { in => 1, out => 10 }, # blah => { in => 2, out => 20 } #} sub hashmap_serial { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); # hash keys are processed in whatever order for my $key ( keys %$hash) { my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "result for $in is $out\n"; $hash->{$key}->{out} = $out; } return $hash; } # does the same thing as hashmap_serial # but saves the value on the hard drive # (serialized in this context means a memory value gets put on the har +d disk, # not to be confused with the sense of "serial as opposed to parallel" sub hashmap_serialized { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); use File::Path qw(mkpath); my $dir="c:/tmp/map_serialized"; mkpath($dir) unless -d "$dir"; die "no directory: $dir" unless -d "$dir"; my $file="$dir/$$.db"; my $db = DBM::Deep->new( $file ); $db->{result}=$hash; for my $key ( keys %$hash ) { my $in = $hash->{$key}->{in}; my $out = $function->($in); $hash->{$key}->{out} = $out; } #unlink $file; #die "couldn't delete file" if -f $file; return $hash; } # uses many DBM::Deep stores, along with forks # works # but slow sub hashmap_parallel_forks_manydbs { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); use File::Temp qw(tempdir); my $dir = tempdir(); use Parallel::ForkManager; use DBM::Deep; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); my $file="$dir/$key"; #print "file: $file\n"; my $db = DBM::Deep->new( $file ); $db->{result}=$out; #print "in $in, out $out\n"; $pm->finish; } $pm->wait_all_children; for my $key ( keys %$hash ) { my $file="$dir/$key"; my $db = DBM::Deep->new( $file ); defined( my $out = $db->{result} ) or die "no result in $file for +key $key"; $hash->{$key}->{out}=$out; } #print "hash: " . Dumper($hash); return $hash; } # tries to use one locked DBM::Deep store, along with forks # doesn't work sub hashmap_parallel_forks_onedb { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); use File::Temp qw(tempfile); my ($fh, $file) = tempfile(); use DBM::Deep; my $db = DBM::Deep->new( file => $file, locking => 1 ); use Parallel::ForkManager; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "in $in, out $out\n"; #$db->lock(); $db->{result}->{$key}->{in}=$in; $db->{result}->{$key}->{out}=$out; #$db->unlock(); $pm->finish; } $pm->wait_all_children; my $result = $db->{result};; #print "result: " . Dumper($result); return $result; #return $hash; } #works sub hashmap_parallel_threads { my $function = shift; my $hash = shift; use threads; my @threads; for ( keys %$hash ) { my $in = $hash->{$_}->{in}; my $t = threads->create( sub { map_element($_, $function, $in ) +} ); push @threads, $t; } # wait for threads to return ( this implementation is bound by +slowest thread ) my %results = map { %{ $_->join() }; } @threads; #print Dumper \%results; return {%results}; } sub map_element { my $key = shift; my $function = shift; my $in = shift; my $out = $function->($in); return { $key => { in => $in, out => $out } }; } 1;
  • Comment on Using DBM::Deep and Parallel::ForkManager for a generalized parallel hashmap function builder (followup to "reducing pain of parallelization with FP")
  • Select or Download Code

Replies are listed 'Best First'.
Re: Using DBM::Deep and Parallel::ForkManager for a generalized parallel hashmap function builder (followup to "reducing pain of parallelization with FP")
by perrin (Chancellor) on Oct 25, 2006 at 18:20 UTC
    You can try replacing DBM::Deep with data serialized by Storable and shared in a simple MySQL table. You could also try sharing it via Cache::FastMmap, but be careful, since it's a cache and can silently drop data.
forked map
by LanceDeeply (Chaplain) on Oct 27, 2006 at 21:28 UTC
    hi-

    instead of managing all that file locking complexity, why not write the results of each element to its own file. then there's no need for dbm deep. you can persist to different files without worrying on stepping on other results. ( i used freeze/thaw to save complex data structures )

    forked_map takes a dir where it temporarily persists results, a max number of workers ( forked children ) so you can tune it as you wish, the function to perform, and the data to perform it on. it uses the same pattern as •Re: Run N similar tasks in parallel
    use strict; use warnings; use Data::Dumper; use File::Path; use File::Spec; use Storable qw(freeze thaw); use POSIX ":sys_wait_h"; my $slow_matches_b = sub { sleep 1; return unless $_[0]; return 1 if $_[0] =~ /b/; }; my $test_strings = [ ('blee','blah','bloo', 'qoo', 'fwee' ) ]; my @results = forked_map( 'c:\\testdir', 2, $slow_matches_b, @$test_st +rings ); print Dumper \@results; sub forked_map { my $tempdir = shift; my $worker_count = shift; my $function = shift; # # ensure tempdir is not some file # if ( -f $tempdir ) { die "$tempdir exists!"; } # # delete dir, then recreate ( destroy any previous results ) # unlink $tempdir; File::Path::mkpath( $tempdir ); # # assign keys to each data element # my $id = 0; my %hash_input = map { ( $id++, $_ )} @_; my %workers; # # loop block for assigning work to workers # { # # assign work to available workers # while ( keys %hash_input and keys %workers < $worker_count ) { $id = (keys %hash_input)[0]; my $data = $hash_input{$id}; delete $hash_input{$id}; # # fork the child process # my $pid = fork; if ( ! defined $pid ) { die "cannot fork: $!"; } if ( $pid ) { # # track the pid in the parent process # $workers{$pid} = 'ACTIVE'; } else { # # work in the child process # my $result = $function->($data); my $frozen_result = freeze(\$result); # # save the results # my $tempfile = File::Spec->catfile($tempdir,$id); open(my $fh, "> $tempfile") or die $!; print $fh $frozen_result; # # bye bye baby # exit 0; } } # # wait for any child to complete # my $pid = wait(); if ($pid == -1) { %workers = (); } else { delete $workers{$pid}; } # # loop if there is any work remaining # redo if ( ( keys %hash_input ) or ( keys %workers ) ) } # # read results from file # my %results; opendir(my $dh,$tempdir); for ( readdir($dh) ) { my $filepath = File::Spec->catfile($tempdir,$_); if ( -f $filepath ) { open(my $fh, $filepath ) or die $!; my $frozen_result = do {local $/; <$fh> }; $results{$_} = ${thaw($frozen_result)}; } } # # clean up after ourselves # unlink $tempdir; # # return results in order # return map { $results{$_} } sort keys %results; }

    update: cleaned up freeze thaw usage