vr has asked for the wisdom of the Perl Monks concerning the following question:

Edit. Please ignore. I forgot there's already "1 while waitpid(-1, WNOHANG) > 0;" added long time ago to obscure part of application to reap rarely spawned children, should re-write it now using e.g. a no-op IO::Async::PID's on_exit handler. Sorry.

The OS is Windows, there's existing IO::Async-based, somewhat complex and clumsy app, which I want to extend. At some point in workflow, lists of file names arrive, in batches up to hundreds of items, but usually, say, 1-20. What I want to insert, is to process each file invoking external CLI tool, which takes, say, 5 to 60 seconds per file. It is to be done asynchronously by a few (2-5, I'll see when I get to observing real load) workers, and when whole current batch is ready, I want to get control back and proceed with workflow as it exists now.

Because it's Windows, the recommended way with IO::Async::Process won't work, it wants real POSIX fork. "OK", I thought, I'll fire "system 1, @args" per file, and PID will be used to add IO::Async::PID watcher to the loop.

For now, I haven't even got to using long-running CLI tool, I'm doing simple "echo filename" instead for experiments. Actually, in not-so-Small-SCCE, they are batches of numbers, not filenames. The code below runs fine as expected, indefinitely. But, when added almost "as-is" to real-life (RL) app, it fails at unpredictable moment, leaving me unpleasant task of cleaning the mess of interrupted workflow.

  • there are 2 "workers" in RL app for this task
  • filenames have spaces, echo-ing them adds quotes, these lines are easily seen in output
  • unlike in SSCCE (output too verbose already), on_exit has something like "say qq($filename done);", so I see 2 filenames echoed (as said above), but then never reported "done"
  • from this I conclude, that on_exit handlers for 2 watched PIDs are missed, futures are therefore not marked as done
  • consequently, convergent future never happens, "batch done" not reported neither
  • "active" hash stays at maximum capacity, de-queueing stops
  • to observe failed on_exit calls, I was hoping to see same (small) numeric id to stay forever in "active" list in SSCCE output, but there are no failures with SSCCE

Because SSCCE works OK, I can only presume on_exit handler fails to be called, when computer is under heavy load (?) when RL app runs. External programs and other parts of RL app do much heavy lifting. How is it even possible, and why? How to simulate this load in SSCCE? How can I debug this?

(storing futures as values in "active" hash was adopted from other parts of workflow, it serves no purpose in this SSCCE, please ignore)

use strict; use warnings; use feature 'say'; use IO::Async::Loop; use IO::Async::Timer::Periodic; use lib '.'; use Runner; use constant { WORKERS => 5, INTERVAL => 2, MAX_BATCH => 32, # MIN is 1 }; my $event_id = 0; my ( $runner, $timer ); my $loop = IO::Async::Loop-> new; $runner = Runner-> new( $loop, WORKERS ); $timer = IO::Async::Timer::Periodic-> new( interval => INTERVAL, on_tick => sub { printf ">>>>> time: %d, workers busy: %d, batches: %d\n", time - $^T, scalar $loop-> notifiers - 1, # exclude timer, scalar $timer-> adopted_futures; my $batch = int rand MAX_BATCH; $timer-> adopt_future( $runner-> call([ $event_id .. $event_id + $batch ]) -> on_ready( sub { my $f = shift; my @id_list = $f-> get; say 'batch done: ', join ',', @id_list }) ); $event_id += $batch + 1 }, ); $timer-> start; $loop-> add( $timer ); $loop-> run;

and Runner.pm:

package Runner; use strict; use warnings; use feature 'say'; use Future; use IO::Async::PID; sub new { my ( $class, $loop, $max_active ) = @_; return bless { loop => $loop, max => $max_active, active => {}, queue => [], }, $class } sub call { my ( $self, $list ) = @_; my @F = map { my $F = $self-> { loop }-> new_future; $F-> { _job_id } = $_; $F-> on_ready( sub { delete $self-> { active }{ $F }; $self-> _run; }) } @$list; push @{ $self-> { queue }}, @F; $self-> _run; return Future-> needs_all( @F ) } sub _run { my $self = shift; if ( keys %{ $self-> { active }}) { say 'jobs active: ', join ',', map { $_-> { _job_id }} values %{ $self-> { active }} } while( keys %{ $self-> { active }} < $self-> { max } and @{ $self-> { queue }}) { my $F = shift @{ $self-> { queue }}; $self-> { active }{ $F } = $F; my $pid = system 1, 'echo', $F-> { _job_id }; $self-> { loop }-> add( IO::Async::PID-> new( pid => $pid, on_exit => sub { my ( $self, $exitcode ) = @_; my $pid = $self-> pid; $F-> done( $F-> { _job_id }); } )); } } 1;