#!/usr/bin/perl use strict; use warnings; use POSIX ':sys_wait_h'; use constant MAX_KIDS => 5; my %config; $config{'sleep_time'} = 2; sub fork_proc { # Fork a new process and execute subroutine within child. my $command = shift; my @args = @_; die "Subroutine reference not specified" unless $command; FORK: { my $pid = fork(); if ($pid) { # Parent - successful fork. return $pid; } elsif (defined $pid) { # Child my $status = &$command(@args); } elsif ($! =~ /No more process/) { # Temporary error forking. Wait and retry. sleep 1; redo FORK; } else { # Bad error die "Fork failed: $!"; } } } sub wait_for_pids { my %pids = map { $_ => 1 } grep { ! /^\s*$/ } @_; my %failed; while (keys %pids) { foreach my $pid ( keys %pids ) { my $reaped = waitpid($pid, WNOHANG); if ($reaped > 0) { my $rc = $?; delete $pids{$reaped}; my $exit = $rc >> 8; if ( $exit) { warn "Nonzero exit code returned by pid $reaped"; $failed{$reaped}++; } } sleep $config{'sleep_time'}; # Don't waste processor time spinning. } } if ( keys %failed ) { die "Non-zero exit code returned by the following PIDs: ", join(", ", keys %failed), ". Stopping"; } } sub parallel_update { # This subroutine takes the following arguments: # code: a subroutine reference # config: a configuration filename (the file must containing a comma # separated list of arguments to the subroutine passed in) # max_kids: maximum number of child processes to run concurrently. # A number of processes will be forked, upto the maximum specified # in the max_kids argument, and each process will execute the # subroutine with the arguments given in the line read from the # configuration file. # As processes finish, they are reaped and a new process started # until all lines in the configuration file have been processed. my %arg = @_; die "Subroutine reference not specified" unless $arg{'code'}; die "Configuration filename not specified" unless $arg{'config'}; die "Maximum number of child processes not specified" unless $arg{'max_kids'}; my %pids; # track PIDs of child processes. open CONF, "<", $arg{'config'} or die "Unable to open configuration file: $!"; while () { last unless defined $_; next if /^$/; chomp; my @cfg = split /,/, $_; # arguments from config file. if (keys %pids < $arg{'max_kids'}) { # start new process, store new PID. my $pid = fork_proc( $arg{'code'}, @cfg); $pids{$pid}++; } else { # wait on at least one currently running child exiting. check # exit code and delete PID from list of currently running child # processes. my $reaped; do { foreach my $pid ( keys %pids ) { $reaped = waitpid($pid, WNOHANG); if ($reaped > 0) { my $rc = $?; delete $pids{$reaped}; my $exit = $rc >> 8; print "$reaped returned $exit.\n"; die "Nonzero exit code returned by pid $reaped. Stopped" if $exit; } } sleep $config{'sleep_time'}; # Don't waste processor time spinning. } until ($reaped > 0); # start new process, store new PID. my $pid = fork_proc( $arg{'code'}, @cfg); $pids{$pid}++; } } close CONF or die "Problems closing configuration file: $!"; # Cleanup the last few child processes as they finish. wait_for_pids( keys %pids ); } # ------------------------------------------------------------------------------ my $code = sub { print "$$ started. Got arguments: ", "@_", "\n"; sleep 2; print "Foobar!\n"; exit 0; }; parallel_update( max_kids => MAX_KIDS, config => "foo.csv", code => $code, ); #### a,b c,d e,f g,h i,j k,l m,n o,p q,r s,t u,v w,y x,z a1,b1 c1,d1 e1,f1 g1,h1 i1,j1 k1,l1 m1,n1 o1,p1 q1,r1 s1,t1 u1,v1 w1,y1 x1,z1 a2,b2 c2,d2 e2,f2 g2,h2 i2,j2 k2,l2 m2,n2 o2,p2 q2,r2 s2,t2 u2,v2 w2,y2 x2,z2