#!/usr/bin/perl use strict; use warnings; use POSIX ':sys_wait_h'; use constant MAX_KIDS => 5; my %config; $config{'sleep_time'} = 2; sub fork_proc { # Fork a new process and execute subroutine within child. my $command = shift; my @args = @_; die "Subroutine reference not specified" unless $command; FORK: { my $pid = fork(); if ($pid) { # Parent - successful fork. return $pid; } elsif (defined $pid) { # Child my $status = &$command(@args); } elsif ($! =~ /No more process/) { # Temporary error forking. Wait and retry. sleep 1; redo FORK; } else { # Bad error die "Fork failed: $!"; } } } sub wait_for_pids { my %pids = map { $_ => 1 } grep { ! /^\s*$/ } @_; my %failed; while (keys %pids) { foreach my $pid ( keys %pids ) { my $reaped = waitpid($pid, WNOHANG); if ($reaped > 0) { my $rc = $?; delete $pids{$reaped}; my $exit = $rc >> 8; if ( $exit) { warn "Nonzero exit code returned by pid $reaped"; $failed{$reaped}++; } } sleep $config{'sleep_time'}; # Don't waste processor time spinning. } } if ( keys %failed ) { die "Non-zero exit code returned by the following PIDs: ", join(", ", keys %failed), ". Stopping"; } } sub parallel_update { # This subroutine takes the following arguments: # code: a subroutine reference # config: a configuration filename (the file must containing a comma # separated list of arguments to the subroutine passed in) # max_kids: maximum number of child processes to run concurrently. # A number of processes will be forked, upto the maximum specified # in the max_kids argument, and each process will execute the # subroutine with the arguments given in the line read from the # configuration file. # As processes finish, they are reaped and a new process started # until all lines in the configuration file have been processed. my %arg = @_; die "Subroutine reference not specified" unless $arg{'code'}; die "Configuration filename not specified" unless $arg{'config'}; die "Maximum number of child processes not specified" unless $arg{'max_kids'}; my %pids; # track PIDs of child processes. open CONF, "<", $arg{'config'} or die "Unable to open configuration file: $!"; while () { last unless defined $_; next if /^$/; chomp; my @cfg = split /,/, $_; # arguments from config file. if (keys %pids < $arg{'max_kids'}) { # start new process, store new PID. my $pid = fork_proc( $arg{'code'}, @cfg); $pids{$pid}++; } else { # wait on at least one currently running child exiting. check # exit code and delete PID from list of currently running child # processes. my $reaped; do { foreach my $pid ( keys %pids ) { $reaped = waitpid($pid, WNOHANG); if ($reaped > 0) { my $rc = $?; delete $pids{$reaped}; my $exit = $rc >> 8; print "$reaped returned $exit.\n"; die "Nonzero exit code returned by pid $reaped. Stopped" if $exit; } } sleep $config{'sleep_time'}; # Don't waste processor time spinning. } until ($reaped > 0); # start new process, store new PID. my $pid = fork_proc( $arg{'code'}, @cfg); $pids{$pid}++; } } close CONF or die "Problems closing configuration file: $!"; # Cleanup the last few child processes as they finish. wait_for_pids( keys %pids ); } # ------------------------------------------------------------------------------ my $code = sub { print "$$ started. Got arguments: ", "@_", "\n"; sleep 2; print "Foobar!\n"; exit 0; }; parallel_update( max_kids => MAX_KIDS, config => "foo.csv", code => $code, );