### Re^2: [OT]: threading recursive subroutines.

by ikegami (Pope)
 on Feb 02, 2011 at 21:21 UTC

Turned out that using closures, while providing a simple design, actually caused the problem. The design below still needs work because it passes closures around, but it works in concept as shown by the Coro implementation below. (It probably defies the point to use Coro, but it's easier to prototype using it.)

```use strict;
use warnings;

use Engine qw( );

use constant NUM_WORKERS => 4;

my (\$engine, \$on_complete, \$n) = @_;
return \$on_complete->(0) if \$n == 0;
return \$on_complete->(1) if \$n == 1;
my (\$x,\$y);
\$engine->process_group(
sub { \$on_complete->(\$x+\$y) },
[ \&fibonacci_task => (sub { \$x = \$_[0] }, \$n-2) ],
[ \&fibonacci_task => (sub { \$y = \$_[0] }, \$n-1) ],
);
}

sub fibonacci {
my (\$engine, \$n) = @_;
my \$result;
\$engine->process_and_wait(
\&fibonacci_task => ( sub { \$result = \$_[0] }, \$n )
);
return \$result;
}

{
my \$engine = Engine->new(NUM_WORKERS);
printf("%s! = %s\n", \$_, fibonacci(\$engine, \$_)) for 1..10;
}
```1! = 1
2! = 1
3! = 2
4! = 3
5! = 5
6! = 8
7! = 13
8! = 21
9! = 34
10! = 55

The following belongs at the top of the above script:

```BEGIN {
package Semaphore;

use strict;
use warnings;

use Coro::Semaphore qw( );

sub new {
my (\$class, \$count) = @_;
my \$s = Coro::Semaphore->new(\$count);
return bless(\\$s, \$class);
}

sub down { my (\$self) = @_; my \$s = \$\$self;  return \$s->guard(); }
sub up   { my (\$self) = @_; my \$s = \$\$self;  return \$s->up(); }
sub wait { my (\$self) = @_; my \$s = \$\$self;  return \$s->wait(); }

\$INC{'Semaphore.pm'} = __FILE__;
}

BEGIN {
package Queue;

use strict;
use warnings;

use Coro::Channel qw( );

sub new {
my (\$class) = @_;
my \$q = Coro::Channel->new();
return bless(\\$q, \$class);
}

sub enqueue { my (\$self, \$item) = @_; my \$q = \$\$self; return \$q->pu
+t(\$item); }
sub dequeue { my (\$self) = @_;        my \$q = \$\$self; return \$q->ge
+t(); }

\$INC{'Queue.pm'} = __FILE__;
}

BEGIN {
package Engine;

use strict;
use warnings;

use Coro qw( async );

use Queue     qw( );
use Semaphore qw( );

sub new {
my (\$class, \$num_workers) = @_;
my \$q = Queue->new();
for (1..\$num_workers) {
async {
for (;;) {
\$q->dequeue()->();
}
};
}

return bless({ queue => \$q }, \$class);
}

sub process_group {
my (\$self, \$on_complete, @tasks) = @_;
my \$mutex = Semaphore->new(1);
\$self->{queue}->enqueue(sub {
\$self,
sub {
if (do { my \$lock = \$mutex->down(); --\$remaining ==
+0 }) {
\$on_complete->();
}
},
);
});
}
}

# Must not be called from within a worker or task.
sub process_and_wait {
my \$self = shift;
my \$done = Semaphore->new(0);
\$self->process_group(sub { \$done->up() }, [ @_ ]);
\$done->wait();
}

\$INC{'Engine.pm'} = __FILE__;
}

