This is a followup from two recent posts:

Could there be ThreadedMapReduce (and/or ForkedMapReduce) instead of DistributedMapReduce?

using parallel processing to concatenate a string, where order of concatenation doesn't matter

In a sense it is a followup from an older post as well:

What is the fastest way to download a bunch of web pages?

The common theme to all these posts is doing something in parallel to accomplish a task, where each bit of the task is independent of the other bits.

It turns out that this is the essense of the MapReduce algorithm that google uses to build much of its most important code. Google uses cluster computing to accomplish this, but the same "parallelization" logic could also be used on a single computer running multiple processes, or even a single process with threads. (Ira Woodhouse has indicated that the next version of MapReduce released to the CPAN will probably have configuration options for single computer clusters in order to do exactly that.)

Currently, the canonical example of a situation where this would be useful for me, on my job, is to download a bunch of web pages and verify that each page regex-matches something that it should. In other words, grep, where part of the grep test is to download a web page.

Throughout my experience with perl, have had numerous other situations where this basic idea -- break down a task into components that can run in parallel, run them, and then reassemble the results -- would have been helpful. Sometimes I would do this, but because I'm not an experienced thread programmer, and sometimes I was on windows sometimes not, sometimes would have to recompile perl, etc etc, it was always an unpleasant experience. There were many occasions when I thought, maybe I could use parallelization here... but painful... not enough time to debug.... I'll just make do with having it be a little slower.

An example of a time when I could have, but didn't, use functional programming based on parallelization to speed things up, was when I had ~50,000 web pages to parse and reformat using a variety of functions based on HTML::TreeBuilder. None of the files cared what the other files looked like, so I could have processed multiples at the same time. But I got the basic system working serially, and it got the job done in a bit over 48 hours. This was too slow, so I did some simple things with forks and got it down to under a day, which was acceptable because the script only had to run once.

But I remember thinking the forking code the ugliest, and hardest to debug, in my program.

If I had had 5 million web pages instead of 50,000, and wanted to split the computation among multiple processors somehow, putting the job queue stuff together for it along the lines I had used till then, it would have been a nightmare. Even though all I was doing was grep, with a little network communication inside the grep test function.

Later when I came across the article

Why Threads are a Bad Idea

it all rang very true to me. Okay, I was working with forks, in a simple context, and this presentation is maybe more about threads with systems programming. But the basic difficulties apply to any situation where you are running stuff in an order that isn't guaranteed. It's different from running stuff through a simple, ordered, for loop in ways that keep quite subtle and hard to detect.

What frustrated me the most is the feeling that I couldn't encapsulate the logic that I want, that I have to keep writing the "ugly bits" again and again.

This was before I had heard of functional programming. Since then, I have been learning a lot about functional programming, and trying to incorporate it into my bag of tools. I originally became interested in this after reading Paul Graham's On Lisp. But I want to apply functional programming to make my life easier in perl. Joel Spolsky suggests in

Can Your Programming Language Do This

that functional programming is a good technique to hide the "ugly but important bits" of your code.

In the article, spolsky suggests that this is exactly what Google has done with their MapReduce algorithm. Google programmers can write code that says the equivalent of

my @results = distributed_mapreduce_grep ( $test_function, [ @in_array ]);

and this would do exactly what

my @results = grep { $test_function($) )} @array

would do in perl. Except that it works on a cluster. So you can process a lot more data, faster.

And the "ugly bits" are hidden.

I want to start hiding the ugly bits of my code, using functional programming.

The following is my attempt to do that.

Unfortunately it still isn't working. But I think it's an interesting read, and I'm also hoping someone can plug something in there that will make it work.

What's nice is that the "ugly bit" that isn't working is encapsulated. This is the function hashmap_parallel. Currently my "parallelization strategy" involves forking off processes and storing values in a DBM::Deep hard disk store. But actually I don't care about the implementation details, I just want it to work.


UPDATE: Thanks to LanceDeeply, I now have code that works, using threads for the parallelization.

I kept the function that doesn't work as hashmap_parallel_forks, which I am still hoping to get working. The code that does work is called hashmap_parallel_threads. The test script has also been updated accordingly.

If anyone else want to shoot me some candidates for their favorite way of implementing transparent parallelization with map, I will add them to the catalog.


Hashmap here means essentially the same as the "map" half in MapReduce. It processes a set, where order doesn't matter. I have several hashmap functions in this code, two of which work, and one (the one that executes in parallel) which doesn't.

These "mapping" functions are given as an argument to the function builder flexygrep, which returns a grepping function. So, as a consequence two of my grepping functions work and one (the parallel one) doesn't.

If I can get hashmap_parallelto work, I'm thinking could theoretically use this to build other functions, like sort_parallel, permute_parallel, you name it. Sometimes, for efficiency reasons, this will make sense. A lot of the time it won't. Depends what your bottleneck is -- cpu, memory, network, disk io, etc. But the good news is that once the parallel mapping function -- or mapping functions -- work, you can just plug them in and try. A lot easier than writing threading code for all scenarios.

Again, the bit that I need to get to work -- but which will pay major dividends in maintainability when I do -- is hashmap_parallel.

Now, here's a little test output.

$ ./ ok 1 - parallel-y threadgrep works not ok 2 - parallel-y forkgrep works # Failed test 'parallel-y forkgrep works' # in ./ at line 21. ok 3 - serially executing code works 1..3 # Looks like you failed 1 test of 3.
And here's the code
test_hashmap: #!/usr/bin/perl use strict; use warnings; use Test::More qw( no_plan ); use Data::Dumper; use Grep; my $slow_matches_b = sub { sleep 1; return unless $_[0]; return 1 if $_[0] =~ /b/; }; my $test_strings = [ ('blee','blah','bloo', 'qoo', 'fwee' ) ]; my $matches; $matches = Grep::threadgrep( $slow_matches_b, $test_strings ); ok( @$matches == 3, "parallel-y threadgrep works" ); # should get blee, blah bloo, but not fwee or qoo $matches = Grep::forkgrep( $slow_matches_b, $test_strings ); ok( @$matches == 3, "parallel-y forkgrep works" ); $matches = Grep::slowgrep( $slow_matches_b, $test_strings ); ok( @$matches == 3, "serially executing code works" ); package Grep; use strict; use warnings; use Data::Dumper; use Map; # grep can be parallelized by building it on top of map_parallel # which uses forks, threads, distributed computations with MapReduce # or some such black magic # in some cases this may be faster, but not always, # it depends where your bottleneck is. # Whatever black magic is going on in the background, # by abstracting it out, the code we get is clean and easy to read. sub threadgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_threads(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub forkgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks(@_)}; return flexygrep($test_function, $map_function, $in_array); } # or you could do it in a non-forked/threaded/distributed/whatever # way, by basing it on the conceptually simpler function map_serial. sub slowgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_serialized(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub flexygrep { my $test_function = shift; my $hashmap_function = shift; my $in_array = shift; my $in_hash = Map::hash_from_array($in_array); my $result_hash = $hashmap_function->($test_function, $in_hash); my $out_array = []; for my $key (keys %$result_hash) { if ( my $out_true = $result_hash->{$key}->{out} ) { push @$out_array, $result_hash->{$key}->{in} } } return $out_array; } 1; package Map; use strict; use warnings; # Black magic for doing stuff in parallel is encapsulated here # use MapReduce; use Parallel::ForkManager; use threads; # use threads::shared qw(is_shared); use DBM::Deep; use Data::Dumper; sub hash_from_array { my $array = shift; my $hash; for my $index (0..$#$array) { $hash->{$index}->{in} = $array->[$index]; } return $hash; } # input is a funcion (eg, my $sub_multiply by ten = { return $_[0] * 1 +0 } ), and # a hash like # my $input_values = { blee => { in => 1 }, # blah => { in => 2} # } # output is a hash like #{ blee => { in => 1, out => 10 }, # blah => { in => 2, out => 20 } #} sub hashmap_serial { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); # hash keys are processed in whatever order for my $key ( keys %$hash) { my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "result for $in is $out\n"; $hash->{$key}->{out} = $out; } return $hash; } # does the same thing as hashmap_serial # but saves the value on the hard drive # (serialized in this context means a memory value gets put on the har +d disk, # not to be confused with the sense of "serial as opposed to parallel" sub hashmap_serialized { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); use File::Path qw(mkpath); my $dir="c:/tmp/map_serialized"; mkpath($dir) unless -d "$dir"; die "no directory: $dir" unless -d "$dir"; my $file="$dir/$$.db"; my $db = DBM::Deep->new( $file ); $db->{result}=$hash; for my $key ( keys %$hash ) { my $in = $hash->{$key}->{in}; my $out = $function->($in); $hash->{$key}->{out} = $out; } #unlink $file; #die "couldn't delete file" if -f $file; return $hash; } # but uses threads to compute "out" values in a parallel way # doesn't work. sub hashmap_parallel_forks { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); return {}; use File::Path qw(mkpath); my $dir="c:/tmp/map_serialized"; mkpath($dir) unless -d "$dir"; die "no directory: $dir" unless -d "$dir"; my $file="$dir/$$.db"; my $db = DBM::Deep->new( $file ); $db->{result}=$hash; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); print "in $in, out $out\n"; $hash->{$key}->{out} = $out; $pm->finish; } $pm->wait_all_children; print "hash: " . Dumper($hash); #unlink $file; #die "couldn't delete file" if -f $file; #die "forkgrep result: " . Dumper($hash); return $hash; } #works sub hashmap_parallel_threads { my $function = shift; my $hash = shift; my @threads; for ( keys %$hash ) { my $in = $hash->{$_}->{in}; my $t = threads->create( sub { map_element($_, $function, $in ) +} ); push @threads, $t; } # wait for threads to return ( this implementation is bound by s +lowest thread ) my %results = map { %{ $_->join() }; } @threads; #print Dumper \%results; return {%results}; } sub map_element { my $key = shift; my $function = shift; my $in = shift; my $out = $function->($in); return { $key => { in => $in, out => $out } }; } 1;

UPDATE: Seemingly relevant comments from Jenda at Re^3: Parrot, threads & fears for the future.:

"You can only transparently paralelize map{} if the executed block is side-effect-free....."

I'm actually not sure if my code here is side effects free or not. Hm...


Posts I'm looking at to see if I can use something there to get hashmap_parallel to work...:

•Re: Run N similar tasks in parallel

  • Comment on Using functional programming to reduce the pain of parallel-execution programming (with threads, forks, or name your poison)
  • Select or Download Code

Replies are listed 'Best First'.
Re: Using functional programming to reduce the pain of parallel-execution programming (with threads, forks, or name your poison)
by tilly (Archbishop) on Oct 23, 2006 at 18:08 UTC
    Whenever I have a problem like this I tend to reach for Run commands in parallel.

    Yes, I know there are slicker and more capable solutions out there. But that solution is one that I know works for me (at least on Unix), and it generally is enough to get the job done with a minimum of fuss.

      But it seems that this will only work to run command line programs in parallel with open3.

      I want something "functional" -- eg, something that will let me run a series of functions (which in perl I'm reading "code references") in parallel, like goog does with MapReduce.

      I'm thinking maybe something more along the lines of Parallel::Simple.

      Thanks all the same, because that was a very interesting read.

        Yes, that only works with command line programs.

        But it is pretty easy to convert function calls to command line programs.

        In any case, I wouldn't use Parallel::Simple because I have always valued the ability to control how many children I have at once. (Generally there is a fairly fixed amount of potential parallelism. Using that many kids get gets maximum throughput. If you go higher or lower your overall throughput goes down.) Therefore if you really don't want to convert function calls then I'd use something like Parallel::ForkManager and build my own solution.

threaded map
by LanceDeeply (Chaplain) on Oct 25, 2006 at 15:33 UTC
    Update: after reading Re: What is the fastest way to download a bunch of web pages? i've reworked this function to follow the same pattern. Notice that Thread::Queue only takes scalars, so if you want to push funky data through the queue, one way is to freeze/thaw it.
    use strict; use warnings; use Data::Dumper; use threads; use Thread::Queue; use Storable qw(freeze thaw); my $slow_matches_b = sub { sleep 1; return unless $_[0]; return 1 if $_[0] =~ /b/; }; my $test_strings = [ ('blee','blah','bloo', 'qoo', 'fwee' ) ]; my @results = threaded_map( 2, $slow_matches_b, @$test_strings ); print Dumper \@results; sub threaded_map { my $thread_count = shift; my $function = shift; # # setup work queues # my $id = 0; my $input_q = new Thread::Queue; my $result_q = new Thread::Queue; for (map { [ $id++, $_ ] } @_) { $input_q->enqueue(freeze($_)); } # # define worker function to run in each thread # my $worker_function = sub { while ( my $frozen_work = $input_q->dequeue_nb ) { my $work = thaw($frozen_work); my $id = $work->[0]; my $input_data = $work->[1]; my $result = $function->($input_data); my $frozen_result = freeze( [ $id, $result ] ); $result_q->enqueue($frozen_result); } }; # # create workers that will read from shared input queue and wri +te to shared output queue # my @threads = map{ threads->create( $worker_function ) } 1 .. $thr +ead_count; $_->join for @threads; # # join the results data together # my %results; while ( my $frozen_result = $result_q->dequeue_nb ) { my $result = thaw($frozen_result); $results{$result->[0]} = $result->[1]; } # # return results in order received # return map { $results{$_} } sort keys %results; }
Re: Using functional programming to reduce the pain of parallel-execution programming (with threads, forks, or name your poison)
by tphyahoo (Vicar) on Oct 25, 2006 at 16:18 UTC