Totally understand F-Monk, some folks just don't like documented code. So, for those advanced programmers, like F-Monk, here you go. However, I think the point of documented code it to be explicit about what one is doing, and why they are doing it. Especially, when one is asking for comment and suggestions.
#!/usr/bin/perl
use strict;
use warnings;
require 5.8.9;
our $VERSION='2011-01-07';
use threads 1.39;
use threads::shared;
use Thread::Queue;
use Time::HiRes qw( usleep time );
use IPC::Open3;
use POSIX qw(:errno_h :sys_wait_h);
use FileHandle;
my $DEFAULTS = {
'exit_success' => 0, ## success exit.
'exit_failure' => 1, ## failure exit.
'ret_success' => 1, ## success return.
'ret_failure' => 0, ## failure return.
'debug' => 1, ## debug.
'random' => 5, ## Random amount of time to sleep.
'pid_timeout' => 3, ## Maximum number of seconds for a
## pid to run.
'max_threads' => 20, ## Maximum number of threads to
## maintain.
'test_jobs' => 100, ## Number of test jobs to run.
};
my $l_DETACHING :shared; ## Prevent double detaching.
my $f_TERM :shared = 0; ## Main termination flag.
my $f_TERM_PRINT :shared = 0; ## Print Manager termination flag.
my $f_TERM_TIMER :shared = 0; ## Timer termination flag.
my $q_IDLE = Thread::Queue->new();
my $q_PRINT :shared;
$q_PRINT = Thread::Queue->new();
my %ACTIVE_PIDS :shared;
$ACTIVE_PIDS{'worker'} = shared_clone({});
my %COMPLETE :shared;
$COMPLETE{'success'} = 0;
$COMPLETE{'failure'} = 0;
*STDOUT->autoflush();
*STDERR->autoflush();
$SIG{'INT'} = $SIG{'TERM'} =
sub {
print STDOUT q{>>> Terminating <<<}."\n";
$f_TERM = 1;
$q_IDLE->insert(0, -1);
};
sub print_manager :locked {
while ( ( ! $f_TERM_PRINT ) ) {
usleep( 500_000 );
if ( $q_PRINT->pending() > 0 ) {
my $time = localtime time;
for ( $q_PRINT->dequeue( ( $q_PRINT->pending() > 5 )
? 5
: $q_PRINT->pending() ) ) {
print STDOUT '[', $time, ']: ', $_, "\n";
} ## end for print.
} ## end if pending.
} ## end while.
usleep( 500_000 );
if ( $q_PRINT->pending() > 0 ) {
my $time = localtime time;
for ( $q_PRINT->dequeue( $q_PRINT->pending() ) ) {
print STDOUT '[', $time, ']: ', $_, "\n";
} ## end for print.
} ## end if pending.
threads->exit();
} ## end print_manager().
sub thread_worker {
my ($q_work) = @_;
my $tid = threads->tid();
do {
$q_PRINT->enqueue( "Thread $tid --> IDLE" );
$q_IDLE->enqueue($tid);
my $work = $q_work->dequeue();
if ( ( $work eq q{} ) || ( $work eq q{-1} ) ) {
return $DEFAULTS->{'ret_failure'};
}
$q_PRINT->enqueue( "Thread $tid <-- WORKING [$work]" );
get_information( $work, $tid );
} while (! $f_TERM);
$q_PRINT->enqueue( "Thread $tid --> FINISHED" );
} ## end thread_worker().
sub thread_timer :locked {
while ( ! $f_TERM_TIMER ) {
usleep( 250_000 );
{
lock %ACTIVE_PIDS;
foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) {
if ( ( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) ||
( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) )
{ next; }
my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'};
my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'};
if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) { next; }
my $runtime = ( time - $pid_start );
if ( $runtime > $DEFAULTS->{'pid_timeout'} ) {
if ( is_pid_alive( $pid ) ) {
$q_PRINT->enqueue( "Thread $tid has exceeded maximum " .
"runtime for process $pid. Current " .
"runtime: $runtime" );
kill( 9, $pid );
} ## end is_pid_alive.
} ## end if timeout.
} ## end foreach.
} ## end lock block.
} ## end while.
{
lock $l_DETACHING;
if ( ! threads->is_detached() ) { threads->detach(); };
threads->exit();
}
} ## end thread_timer().
sub is_pid_alive($) {
my $pid = shift;
my $msg = q{};
my $status = $DEFAULTS->{'ret_success'};
if ( kill(0, $pid) ) { ## Still alive.
$msg = "Process $pid is still running";
$status = $DEFAULTS->{'ret_success'};
} elsif ( $! == EPERM ) { ## Changed UID.
$msg = "Process $pid is changed UID";
$status = $DEFAULTS->{'ret_success'};
} elsif ( $! == ESRCH ) { ## Died or zombied.
$msg = "Process $pid is died or zombied";
$status = $DEFAULTS->{'ret_failure'};
} else { ## Could not locate.
$msg = "Process $pid search failure";
$status = $DEFAULTS->{'ret_failure'};
}
if ( $DEFAULTS->{'debug'} ) { $q_PRINT->enqueue( "$msg"); }
return $status;
} ## end is_pid_alive.
sub get_information($$) {
my ( $job, $tid ) = @_;
local ( *w_IN, *w_OUT, *w_ERR );
my $pid;
my $cleanup = sub {
w_IN->close if ( fileno(w_IN) );
w_OUT->close if ( fileno(w_OUT) );
w_ERR->close if ( fileno(w_ERR) );
{
lock %ACTIVE_PIDS;
$ACTIVE_PIDS{'worker'}{$tid}{'pid'} = -1;
$ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} = -1;
}
};
$pid = eval {
my $val = int( rand($DEFAULTS->{'random'}) );
open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', $val );
};
if ( ( $@ ) || ( ! defined $pid ) ) {
$q_PRINT->enqueue( "Thread $tid failed open for job $job" );
$cleanup->();
return $DEFAULTS->{'ret_failure'};
}
{
lock %ACTIVE_PIDS;
$ACTIVE_PIDS{'worker'}{$tid} = shared_clone (
{ 'pid' => $pid,
'pid_start' => time } );
}
waitpid( $pid, 0 );
my $pid_exit_status = ${^CHILD_ERROR_NATIVE};
if ( $pid_exit_status != 0 ) {
$q_PRINT->enqueue( "- Thread $tid process $pid " .
"exit $pid_exit_status" );
$cleanup->();
{
lock %COMPLETE;
$COMPLETE{'failure'}++;
}
return $DEFAULTS->{'ret_failure'};
}
$q_PRINT->enqueue( "+ Thread $tid process $pid " .
"exit $pid_exit_status" );
{
lock %COMPLETE;
$COMPLETE{'success'}++;
}
$cleanup->();
$DEFAULTS->{'ret_success'};
} ## end get_Information().
sub shutdown_engine {
$q_PRINT->enqueue( "Engine shutdown process started" );
my $active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} );
while ( $active_count > 0 ) {
{
lock %ACTIVE_PIDS;
foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) {
if ( ( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) ||
( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) ) {
my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'};
my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'};
if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) {
delete ( $ACTIVE_PIDS{'worker'}{$tid} );
} ## end if pid check.
} else {
delete ( $ACTIVE_PIDS{'worker'}{$tid} );
} ## end if defined.
} ## end foreach active pid.
} ## end lock block.
usleep( 250_000 );
$active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} );
} ## end while active count.
$f_TERM_TIMER = 1;
usleep( 250_000 );
while ( $q_PRINT->pending() > 0 ) { usleep( 250_000 ); }
$f_TERM_PRINT = 1;
return $DEFAULTS->{'ret_success'};
} ## end shutdown_engine().
MAIN:
{
my $q_jobs = Thread::Queue->new();
for ( 1..$DEFAULTS->{'test_jobs'} ) {
my $job = q{JOB-}.$_;
$q_jobs->enqueue($job);
}
my $thr_pm = threads->create( \&print_manager )->detach();
my $thr_tm = threads->create( \&thread_timer )->detach();
my %work_queues;
for ( 1..$DEFAULTS->{'max_threads'} ) {
my $q_work = Thread::Queue->new();
my $thr = threads->create( \&thread_worker, $q_work );
$work_queues{$thr->tid()} = $q_work;
} ## end for max_threads.
while ( ! $f_TERM ) {
usleep( 250_000 );
$q_PRINT->enqueue( "Active threads " . int( threads->list() ) );
my $tid = $q_IDLE->dequeue();
last if ($tid < 0);
if ( $q_jobs->pending() > 0 ) {
my $work = $q_jobs->dequeue_nb();
$work_queues{$tid}->enqueue( $work );
} else {
$q_PRINT->enqueue( 'No More Jobs to Submit' );
$f_TERM = 1;
} ## end if pending jobs.
} ## end while ! f_TERM.
$work_queues{$_}->enqueue(-1) foreach keys(%work_queues);
$_->join() foreach threads->list();
} ## end MAIN.
shutdown_engine();
print '*' x60, "\n";
print 'Success: ', $COMPLETE{'success'}, "\n";
print 'Failure: ', $COMPLETE{'failure'}, "\n";
print '*' x60, "\n";
print("Done\n");
exit $DEFAULTS->{'exit_success'};