Beefy Boxes and Bandwidth Generously Provided by pair Networks
Syntactic Confectionery Delight
 
PerlMonks  

Re^2: [OT]: threading recursive subroutines.

by ikegami (Pope)
on Feb 02, 2011 at 21:21 UTC ( #885856=note: print w/ replies, xml ) Need Help??


in reply to Re: [OT]: threading recursive subroutines.
in thread [OT]: threading recursive subroutines.

Turned out that using closures, while providing a simple design, actually caused the problem. The design below still needs work because it passes closures around, but it works in concept as shown by the Coro implementation below. (It probably defies the point to use Coro, but it's easier to prototype using it.)

use strict; use warnings; use Engine qw( ); use constant NUM_WORKERS => 4; sub fibonacci_task { my ($engine, $on_complete, $n) = @_; return $on_complete->(0) if $n == 0; return $on_complete->(1) if $n == 1; my ($x,$y); $engine->process_group( sub { $on_complete->($x+$y) }, [ \&fibonacci_task => (sub { $x = $_[0] }, $n-2) ], [ \&fibonacci_task => (sub { $y = $_[0] }, $n-1) ], ); } sub fibonacci { my ($engine, $n) = @_; my $result; $engine->process_and_wait( \&fibonacci_task => ( sub { $result = $_[0] }, $n ) ); return $result; } { my $engine = Engine->new(NUM_WORKERS); printf("%s! = %s\n", $_, fibonacci($engine, $_)) for 1..10; }
1! = 1 2! = 1 3! = 2 4! = 3 5! = 5 6! = 8 7! = 13 8! = 21 9! = 34 10! = 55

The following belongs at the top of the above script:

BEGIN { package Semaphore; use strict; use warnings; use Coro::Semaphore qw( ); sub new { my ($class, $count) = @_; my $s = Coro::Semaphore->new($count); return bless(\$s, $class); } sub down { my ($self) = @_; my $s = $$self; return $s->guard(); } sub up { my ($self) = @_; my $s = $$self; return $s->up(); } sub wait { my ($self) = @_; my $s = $$self; return $s->wait(); } $INC{'Semaphore.pm'} = __FILE__; } BEGIN { package Queue; use strict; use warnings; use Coro::Channel qw( ); sub new { my ($class) = @_; my $q = Coro::Channel->new(); return bless(\$q, $class); } sub enqueue { my ($self, $item) = @_; my $q = $$self; return $q->pu +t($item); } sub dequeue { my ($self) = @_; my $q = $$self; return $q->ge +t(); } $INC{'Queue.pm'} = __FILE__; } BEGIN { package Engine; use strict; use warnings; use Coro qw( async ); use Queue qw( ); use Semaphore qw( ); sub new { my ($class, $num_workers) = @_; my $q = Queue->new(); for (1..$num_workers) { async { for (;;) { $q->dequeue()->(); } }; } return bless({ queue => $q }, $class); } sub process_group { my ($self, $on_complete, @tasks) = @_; my $mutex = Semaphore->new(1); my $remaining = @tasks; for my $task (@tasks) { my ($task_sub, $task_on_complete, @task_args) = @$task; $self->{queue}->enqueue(sub { $task_sub->( $self, sub { $task_on_complete->(@_); if (do { my $lock = $mutex->down(); --$remaining == +0 }) { $on_complete->(); } }, @task_args, ); }); } } # Must not be called from within a worker or task. sub process_and_wait { my $self = shift; my $done = Semaphore->new(0); $self->process_group(sub { $done->up() }, [ @_ ]); $done->wait(); } $INC{'Engine.pm'} = __FILE__; }


Comment on Re^2: [OT]: threading recursive subroutines.
Select or Download Code

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://885856]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others drinking their drinks and smoking their pipes about the Monastery: (13)
As of 2014-10-22 13:12 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    For retirement, I am banking on:










    Results (118 votes), past polls