use strict; use warnings; use feature 'say'; use IO::Async::Loop; use IO::Async::Timer::Periodic; use lib '.'; use Runner; use constant { WORKERS => 5, INTERVAL => 2, MAX_BATCH => 32, # MIN is 1 }; my $event_id = 0; my ( $runner, $timer ); my $loop = IO::Async::Loop-> new; $runner = Runner-> new( $loop, WORKERS ); $timer = IO::Async::Timer::Periodic-> new( interval => INTERVAL, on_tick => sub { printf ">>>>> time: %d, workers busy: %d, batches: %d\n", time - $^T, scalar $loop-> notifiers - 1, # exclude timer, scalar $timer-> adopted_futures; my $batch = int rand MAX_BATCH; $timer-> adopt_future( $runner-> call([ $event_id .. $event_id + $batch ]) -> on_ready( sub { my $f = shift; my @id_list = $f-> get; say 'batch done: ', join ',', @id_list }) ); $event_id += $batch + 1 }, ); $timer-> start; $loop-> add( $timer ); $loop-> run; #### package Runner; use strict; use warnings; use feature 'say'; use Future; use IO::Async::PID; sub new { my ( $class, $loop, $max_active ) = @_; return bless { loop => $loop, max => $max_active, active => {}, queue => [], }, $class } sub call { my ( $self, $list ) = @_; my @F = map { my $F = $self-> { loop }-> new_future; $F-> { _job_id } = $_; $F-> on_ready( sub { delete $self-> { active }{ $F }; $self-> _run; }) } @$list; push @{ $self-> { queue }}, @F; $self-> _run; return Future-> needs_all( @F ) } sub _run { my $self = shift; if ( keys %{ $self-> { active }}) { say 'jobs active: ', join ',', map { $_-> { _job_id }} values %{ $self-> { active }} } while( keys %{ $self-> { active }} < $self-> { max } and @{ $self-> { queue }}) { my $F = shift @{ $self-> { queue }}; $self-> { active }{ $F } = $F; my $pid = system 1, 'echo', $F-> { _job_id }; $self-> { loop }-> add( IO::Async::PID-> new( pid => $pid, on_exit => sub { my ( $self, $exitcode ) = @_; my $pid = $self-> pid; $F-> done( $F-> { _job_id }); } )); } } 1;