test_hashmap.pl: #!/usr/bin/perl use strict; use warnings; use Test::More qw( no_plan ); use Data::Dumper; use Grep; # several different versions of grep, built up functionally. my $slow_matches_b = sub { sleep 1; return unless $_[0]; return 1 if $_[0] =~ /[abc]/; }; my $justafew_letters = [ my @letters = (('a'..'c') x 3) ]; my $lotsa_letters = [ (('a'..'z') x 3) ]; for ( $justafew_letters, $lotsa_letters ){ testem($_) } sub testem { my $test_strings = shift or die "no test strings"; print "test strings: @$test_strings\n"; my $paralell_tests = [ { testname => 'threads', function => sub { Grep::threadgrep( $_[0], $_[1]) } }, {testname=> 'fork with many dbm::deep dbs', function => sub { Grep::fork_manydbs_grep($_[0], $_[1]) } }, {testname => 'fork with 1 dbm::deep db', function => sub { Grep::fork_onedb_grep($_[0], $_[1]) } }, #{testname => 'simple serial map', #function => sub { Grep::slowgrep($_[0], $_[1]) } #} ]; my $max_seconds_parallel=2; # parallel execution should speed things up for (@$paralell_tests ) { my $timestarted=time; my $test_name = $_->{testname}; my $matches = $_->{function}->($slow_matches_b, $test_strings); my $timeelapsed=time - $timestarted; my $num_matches = @$matches; ok( $num_matches == 9, "$test_name, matches: $num_matches -- @$matches -- should be 9"); ok( $timeelapsed <= $max_seconds_parallel, "$test_name, time elapsed: $timeelapsed, wanted max seconds: $max_seconds_parallel"); } } #my ($timestarted, $timeelapsed); Grep.pm: package Grep; use strict; use warnings; use Data::Dumper; use Map; # grep can be parallelized by building it on top of map_parallel # which uses forks, threads, distributed computations with MapReduce # or some such black magic # in some cases this may be faster, but not always, # it depends where your bottleneck is. # Whatever black magic is going on in the background, # by abstracting it out, the code we get is clean and easy to read. sub threadgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_threads(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub fork_onedb_grep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks_onedb(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub fork_manydbs_grep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks_manydbs(@_)}; return flexygrep($test_function, $map_function, $in_array); } # or you could do it in a non-forked/threaded/distributed/whatever # way, by basing it on the conceptually simpler function map_serial. sub slowgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_serialized(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub flexygrep { my $test_function = shift; my $hashmap_function = shift; my $in_array = shift; my $in_hash = Map::hash_from_array($in_array); my $result_hash = $hashmap_function->($test_function, $in_hash); my $out_array = []; for my $key (keys %$result_hash) { if ( my $out_true = $result_hash->{$key}->{out} ) { push @$out_array, $result_hash->{$key}->{in} } } return $out_array; } 1; Map.pm: package Map; use strict; use warnings; # Black magic for doing stuff in parallel is encapsulated here # use MapReduce; -- not yet, but it's on the list. use Data::Dumper; sub hash_from_array { my $array = shift; my $hash; for my $index (0..$#$array) { $hash->{$index}->{in} = $array->[$index]; } return $hash; } # input is a funcion (eg, my $sub_multiply by ten = { return $_[0] * 10 } ), and # a hash like # my $input_values = { blee => { in => 1 }, # blah => { in => 2} # } # output is a hash like #{ blee => { in => 1, out => 10 }, # blah => { in => 2, out => 20 } #} sub hashmap_serial { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in}) } (keys %$hash); # hash keys are processed in whatever order for my $key ( keys %$hash) { my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "result for $in is $out\n"; $hash->{$key}->{out} = $out; } return $hash; } # does the same thing as hashmap_serial # but saves the value on the hard drive # (serialized in this context means a memory value gets put on the hard disk, # not to be confused with the sense of "serial as opposed to parallel" sub hashmap_serialized { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in}) } (keys %$hash); use File::Path qw(mkpath); my $dir="c:/tmp/map_serialized"; mkpath($dir) unless -d "$dir"; die "no directory: $dir" unless -d "$dir"; my $file="$dir/$$.db"; my $db = DBM::Deep->new( $file ); $db->{result}=$hash; for my $key ( keys %$hash ) { my $in = $hash->{$key}->{in}; my $out = $function->($in); $hash->{$key}->{out} = $out; } #unlink $file; #die "couldn't delete file" if -f $file; return $hash; } # uses many DBM::Deep stores, along with forks # works # but slow sub hashmap_parallel_forks_manydbs { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in}) } (keys %$hash); use File::Temp qw(tempdir); my $dir = tempdir(); use Parallel::ForkManager; use DBM::Deep; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); my $file="$dir/$key"; #print "file: $file\n"; my $db = DBM::Deep->new( $file ); $db->{result}=$out; #print "in $in, out $out\n"; $pm->finish; } $pm->wait_all_children; for my $key ( keys %$hash ) { my $file="$dir/$key"; my $db = DBM::Deep->new( $file ); defined( my $out = $db->{result} ) or die "no result in $file for key $key"; $hash->{$key}->{out}=$out; } #print "hash: " . Dumper($hash); return $hash; } # tries to use one locked DBM::Deep store, along with forks # doesn't work sub hashmap_parallel_forks_onedb { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in}) } (keys %$hash); use File::Temp qw(tempfile); my ($fh, $file) = tempfile(); use DBM::Deep; my $db = DBM::Deep->new( file => $file, locking => 1 ); use Parallel::ForkManager; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "in $in, out $out\n"; #$db->lock(); $db->{result}->{$key}->{in}=$in; $db->{result}->{$key}->{out}=$out; #$db->unlock(); $pm->finish; } $pm->wait_all_children; my $result = $db->{result};; #print "result: " . Dumper($result); return $result; #return $hash; } #works sub hashmap_parallel_threads { my $function = shift; my $hash = shift; use threads; my @threads; for ( keys %$hash ) { my $in = $hash->{$_}->{in}; my $t = threads->create( sub { map_element($_, $function, $in ) } ); push @threads, $t; } # wait for threads to return ( this implementation is bound by slowest thread ) my %results = map { %{ $_->join() }; } @threads; #print Dumper \%results; return {%results}; } sub map_element { my $key = shift; my $function = shift; my $in = shift; my $out = $function->($in); return { $key => { in => $in, out => $out } }; } 1;