Perl: the Markov chain saw PerlMonks

### Re^2: [OT]: threading recursive subroutines.

by ikegami (Pope)
 on Feb 02, 2011 at 21:21 UTC ( #885856=note: print w/replies, xml ) Need Help??

Turned out that using closures, while providing a simple design, actually caused the problem. The design below still needs work because it passes closures around, but it works in concept as shown by the Coro implementation below. (It probably defies the point to use Coro, but it's easier to prototype using it.)

```use strict;
use warnings;

use Engine qw( );

use constant NUM_WORKERS => 4;

my (\$engine, \$on_complete, \$n) = @_;
return \$on_complete->(0) if \$n == 0;
return \$on_complete->(1) if \$n == 1;
my (\$x,\$y);
\$engine->process_group(
sub { \$on_complete->(\$x+\$y) },
[ \&fibonacci_task => (sub { \$x = \$_[0] }, \$n-2) ],
[ \&fibonacci_task => (sub { \$y = \$_[0] }, \$n-1) ],
);
}

sub fibonacci {
my (\$engine, \$n) = @_;
my \$result;
\$engine->process_and_wait(
\&fibonacci_task => ( sub { \$result = \$_[0] }, \$n )
);
return \$result;
}

{
my \$engine = Engine->new(NUM_WORKERS);
printf("%s! = %s\n", \$_, fibonacci(\$engine, \$_)) for 1..10;
}
```1! = 1
2! = 1
3! = 2
4! = 3
5! = 5
6! = 8
7! = 13
8! = 21
9! = 34
10! = 55

The following belongs at the top of the above script:

```BEGIN {
package Semaphore;

use strict;
use warnings;

use Coro::Semaphore qw( );

sub new {
my (\$class, \$count) = @_;
my \$s = Coro::Semaphore->new(\$count);
return bless(\\$s, \$class);
}

sub down { my (\$self) = @_; my \$s = \$\$self;  return \$s->guard(); }
sub up   { my (\$self) = @_; my \$s = \$\$self;  return \$s->up(); }
sub wait { my (\$self) = @_; my \$s = \$\$self;  return \$s->wait(); }

\$INC{'Semaphore.pm'} = __FILE__;
}

BEGIN {
package Queue;

use strict;
use warnings;

use Coro::Channel qw( );

sub new {
my (\$class) = @_;
my \$q = Coro::Channel->new();
return bless(\\$q, \$class);
}

sub enqueue { my (\$self, \$item) = @_; my \$q = \$\$self; return \$q->pu
+t(\$item); }
sub dequeue { my (\$self) = @_;        my \$q = \$\$self; return \$q->ge
+t(); }

\$INC{'Queue.pm'} = __FILE__;
}

BEGIN {
package Engine;

use strict;
use warnings;

use Coro qw( async );

use Queue     qw( );
use Semaphore qw( );

sub new {
my (\$class, \$num_workers) = @_;
my \$q = Queue->new();
for (1..\$num_workers) {
async {
for (;;) {
\$q->dequeue()->();
}
};
}

return bless({ queue => \$q }, \$class);
}

sub process_group {
my (\$self, \$on_complete, @tasks) = @_;
my \$mutex = Semaphore->new(1);
\$self->{queue}->enqueue(sub {
\$self,
sub {
if (do { my \$lock = \$mutex->down(); --\$remaining ==
+0 }) {
\$on_complete->();
}
},
);
});
}
}

# Must not be called from within a worker or task.
sub process_and_wait {
my \$self = shift;
my \$done = Semaphore->new(0);
\$self->process_group(sub { \$done->up() }, [ @_ ]);
\$done->wait();
}

\$INC{'Engine.pm'} = __FILE__;
}

Create A New User
Node Status?
node history
Node Type: note [id://885856]
help
Chatterbox?
and all is quiet...

How do I use this? | Other CB clients
Other Users?
Others chilling in the Monastery: (5)
As of 2017-08-20 07:02 GMT
Sections?
Information?
Find Nodes?
Leftovers?
Voting Booth?
Who is your favorite scientist and why?

Results (313 votes). Check out past polls.

Notices?