note
tphyahoo
Okay, I have here stub code for a parallel "threadedreduce", using what I think is pretty much functional style programming.
<p>
My goal is along the same lines as my post yesterday concerning concating letters (mentioned in op), but more honed. Basically, I have a slow function for demo purposes, and I want to speed it up by using code executed in parallel; but in such a way that I can abstract the ugly bigs (locks, threads, etcetera) into a module that I maintain separately.
<p>
So, here, my function in in the coderef: <code> $slow_matches_b</code> It's slow because it sleeps for one second. And what it does it, it returns true if the input matches the letter b.
<p>
I then have <code>sub slowgrep</code>, which should do the same thing that grep does, but in a more explicit way. <code>slowgrep</code> is built on <code>sub reduce</code>. This function is similar to the <code>reduce</code> you get in List::Util, but like Frank Sinatra, here, I did it my way. Could be it's not quite the same. I actually modeled it after the javascript reduce example code in the joelonsoftware essay <a href="http://www.joelonsoftware.com/items/2006/08/01.html">Can Your Programming Language Do This?</a>.
<p>
Reduce is one of the basic building blocks of functional programming, and half of the google's celebrated MapReduce. This particular implementation of Reduce executes serially.
<p>
Then I have the <code>sub fastgrep</code>, which should also do the same thing that grep does, but be faster because it uses some form of parallelization. Every line of this function is the same as slowgrep, except it is based on <code>threadedreduce</code> instead of <code>reduce</code>.
<p>
<code>threadedreduce</code> should do the same thing as <code>reduce</code> except... it doesn't. As the tests show.
In fact, it is just a stub, and actually it has forks not threads, but you get the idea. I think this function could be implemented either with forks, or with threads, or with POE, or whatever your favorite technique for cranking up parallelization in your environment. The point is that it
lets programmers process lists in a parallel way while hiding away the complexity of the parallelized code.
<p>
This is where I am hoping the monastery can step in and help me out. Can someone out there make <code>threadedreduce</code> work? <code>threadedreduce</code> could then be used to build threadedgrep, threadedUserAgent (same as UserAgent::Parallel, but you can WWW::Mechanize as the UA instead of the UA inherited from LWP::UserAgent)... or... well... lots of things. And in a way that's beautiful, functional, and maintainable.
<code>
$ ./test_reduce.pl
not ok 1 - parallel-y executing code works
# Failed test 'parallel-y executing code works'
# in ./test_reduce.pl at line 15.
fast matches: $VAR1 = [];
ok 2 - serially executing code works
slow matches: $VAR1 = [
'blee',
'blah',
'bloo'
];
1..2
# Looks like you failed 1 test of 2.
$ cat test_reduce.pl
#!/usr/bin/perl
use strict;
use warnings;
use Test::More qw( no_plan );
use Data::Dumper;
use Parallel::ForkManager;
my $slow_matches_b = sub { sleep 1; return 1 if $_[0] =~ /b/; };
my $test_strings = [ ('blee','blah','bloo', 'qoo', 'fwee' ) ];
my $fast_matches = fastgrep( $slow_matches_b, $test_strings );
ok( @$fast_matches, "parallel-y executing code works" );
print "fast matches: " . Dumper($fast_matches);
# should dump out blee, blah bloo, but not fwee or qoo
my $slow_matches = slowgrep( $slow_matches_b, $test_strings );
ok( @$slow_matches, "serially executing code works" );
print "slow matches: " . Dumper($slow_matches);
sub fastgrep {
my $test_function = shift;
my $array = shift;
my $grep_builder = sub {
my $matches = shift;
my $test_el = shift;
push @$matches, $test_el if $test_function->($test_el);
return $matches;
};
return threadedreduce ( $grep_builder, $array, [])
}
sub slowgrep {
my $test_function = shift;
my $array = shift;
my $grep_builder = sub {
my $matches = shift;
my $test_el = shift;
push @$matches, $test_el if $test_function->($test_el);
return $matches;
};
return reduce( $grep_builder, $array, [])
}
# just a stub, hoping someone can help me with the threading
sub threadedreduce {
my $function = shift;
my $array = shift;
my $init = shift;
my $pm=new Parallel::ForkManager(10);
my $result = $init;
for my $el ( @$array) {
$pm->start and next;
$result = $function->( $result, $el);
$pm->finish;
}
$pm->wait_all_children;
return $result;
}
sub reduce {
my $function = shift;
my $array = shift;
my $init = shift;
my $result = $init;
for my $el ( @$array) {
$result = $function->( $result, $el)
}
return $result;
}
</code>
UPDATE: I'm actually thinking the promising avenue of approach may to use [mod://Parallel::Queue], along the lines [diotalevi] suggested yesterday in [id://579080].
579540
579552