BEGIN { package Semaphore; use strict; use warnings; use Coro::Semaphore qw( ); sub new { my ($class, $count) = @_; my $s = Coro::Semaphore->new($count); return bless(\$s, $class); } sub down { my ($self) = @_; my $s = $$self; return $s->guard(); } sub up { my ($self) = @_; my $s = $$self; return $s->up(); } sub wait { my ($self) = @_; my $s = $$self; return $s->wait(); } $INC{'Semaphore.pm'} = __FILE__; } BEGIN { package Queue; use strict; use warnings; use Coro::Channel qw( ); sub new { my ($class) = @_; my $q = Coro::Channel->new(); return bless(\$q, $class); } sub enqueue { my ($self, $item) = @_; my $q = $$self; return $q->put($item); } sub dequeue { my ($self) = @_; my $q = $$self; return $q->get(); } $INC{'Queue.pm'} = __FILE__; } BEGIN { package Engine; use strict; use warnings; use Coro qw( async ); use Queue qw( ); use Semaphore qw( ); sub new { my ($class, $num_workers) = @_; my $q = Queue->new(); for (1..$num_workers) { async { for (;;) { $q->dequeue()->(); } }; } return bless({ queue => $q }, $class); } sub process_group { my ($self, $on_complete, @tasks) = @_; my $mutex = Semaphore->new(1); my $remaining = @tasks; for my $task (@tasks) { my ($task_sub, $task_on_complete, @task_args) = @$task; $self->{queue}->enqueue(sub { $task_sub->( $self, sub { $task_on_complete->(@_); if (do { my $lock = $mutex->down(); --$remaining == 0 }) { $on_complete->(); } }, @task_args, ); }); } } # Must not be called from within a worker or task. sub process_and_wait { my $self = shift; my $done = Semaphore->new(0); $self->process_group(sub { $done->up() }, [ @_ ]); $done->wait(); } $INC{'Engine.pm'} = __FILE__; }