Beefy Boxes and Bandwidth Generously Provided by pair Networks
Keep It Simple, Stupid
 
PerlMonks  

Re^2: RFC: Threads, Pooling, Queues, Timers and More...

by DeadPoet (Scribe)
on Jan 08, 2011 at 02:12 UTC ( #881190=note: print w/ replies, xml ) Need Help??


in reply to Re: RFC: Threads, Pooling, Queues, Timers and More...
in thread RFC: Threads, Pooling, Queues, Timers and More...

Totally understand F-Monk, some folks just don't like documented code. So, for those advanced programmers, like F-Monk, here you go. However, I think the point of documented code it to be explicit about what one is doing, and why they are doing it. Especially, when one is asking for comment and suggestions.

#!/usr/bin/perl use strict; use warnings; require 5.8.9; our $VERSION='2011-01-07'; use threads 1.39; use threads::shared; use Thread::Queue; use Time::HiRes qw( usleep time ); use IPC::Open3; use POSIX qw(:errno_h :sys_wait_h); use FileHandle; my $DEFAULTS = { 'exit_success' => 0, ## success exit. 'exit_failure' => 1, ## failure exit. 'ret_success' => 1, ## success return. 'ret_failure' => 0, ## failure return. 'debug' => 1, ## debug. 'random' => 5, ## Random amount of time to sleep. 'pid_timeout' => 3, ## Maximum number of seconds for a ## pid to run. 'max_threads' => 20, ## Maximum number of threads to ## maintain. 'test_jobs' => 100, ## Number of test jobs to run. }; my $l_DETACHING :shared; ## Prevent double detaching. my $f_TERM :shared = 0; ## Main termination flag. my $f_TERM_PRINT :shared = 0; ## Print Manager termination flag. my $f_TERM_TIMER :shared = 0; ## Timer termination flag. my $q_IDLE = Thread::Queue->new(); my $q_PRINT :shared; $q_PRINT = Thread::Queue->new(); my %ACTIVE_PIDS :shared; $ACTIVE_PIDS{'worker'} = shared_clone({}); my %COMPLETE :shared; $COMPLETE{'success'} = 0; $COMPLETE{'failure'} = 0; *STDOUT->autoflush(); *STDERR->autoflush(); $SIG{'INT'} = $SIG{'TERM'} = sub { print STDOUT q{>>> Terminating <<<}."\n"; $f_TERM = 1; $q_IDLE->insert(0, -1); }; sub print_manager :locked { while ( ( ! $f_TERM_PRINT ) ) { usleep( 500_000 ); if ( $q_PRINT->pending() > 0 ) { my $time = localtime time; for ( $q_PRINT->dequeue( ( $q_PRINT->pending() > 5 ) ? 5 : $q_PRINT->pending() ) ) { print STDOUT '[', $time, ']: ', $_, "\n"; } ## end for print. } ## end if pending. } ## end while. usleep( 500_000 ); if ( $q_PRINT->pending() > 0 ) { my $time = localtime time; for ( $q_PRINT->dequeue( $q_PRINT->pending() ) ) { print STDOUT '[', $time, ']: ', $_, "\n"; } ## end for print. } ## end if pending. threads->exit(); } ## end print_manager(). sub thread_worker { my ($q_work) = @_; my $tid = threads->tid(); do { $q_PRINT->enqueue( "Thread $tid --> IDLE" ); $q_IDLE->enqueue($tid); my $work = $q_work->dequeue(); if ( ( $work eq q{} ) || ( $work eq q{-1} ) ) { return $DEFAULTS->{'ret_failure'}; } $q_PRINT->enqueue( "Thread $tid <-- WORKING [$work]" ); get_information( $work, $tid ); } while (! $f_TERM); $q_PRINT->enqueue( "Thread $tid --> FINISHED" ); } ## end thread_worker(). sub thread_timer :locked { while ( ! $f_TERM_TIMER ) { usleep( 250_000 ); { lock %ACTIVE_PIDS; foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) { if ( ( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) || ( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) ) { next; } my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'}; my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'}; if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) { next; } my $runtime = ( time - $pid_start ); if ( $runtime > $DEFAULTS->{'pid_timeout'} ) { if ( is_pid_alive( $pid ) ) { $q_PRINT->enqueue( "Thread $tid has exceeded maximum " . "runtime for process $pid. Current " . "runtime: $runtime" ); kill( 9, $pid ); } ## end is_pid_alive. } ## end if timeout. } ## end foreach. } ## end lock block. } ## end while. { lock $l_DETACHING; if ( ! threads->is_detached() ) { threads->detach(); }; threads->exit(); } } ## end thread_timer(). sub is_pid_alive($) { my $pid = shift; my $msg = q{}; my $status = $DEFAULTS->{'ret_success'}; if ( kill(0, $pid) ) { ## Still alive. $msg = "Process $pid is still running"; $status = $DEFAULTS->{'ret_success'}; } elsif ( $! == EPERM ) { ## Changed UID. $msg = "Process $pid is changed UID"; $status = $DEFAULTS->{'ret_success'}; } elsif ( $! == ESRCH ) { ## Died or zombied. $msg = "Process $pid is died or zombied"; $status = $DEFAULTS->{'ret_failure'}; } else { ## Could not locate. $msg = "Process $pid search failure"; $status = $DEFAULTS->{'ret_failure'}; } if ( $DEFAULTS->{'debug'} ) { $q_PRINT->enqueue( "$msg"); } return $status; } ## end is_pid_alive. sub get_information($$) { my ( $job, $tid ) = @_; local ( *w_IN, *w_OUT, *w_ERR ); my $pid; my $cleanup = sub { w_IN->close if ( fileno(w_IN) ); w_OUT->close if ( fileno(w_OUT) ); w_ERR->close if ( fileno(w_ERR) ); { lock %ACTIVE_PIDS; $ACTIVE_PIDS{'worker'}{$tid}{'pid'} = -1; $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} = -1; } }; $pid = eval { my $val = int( rand($DEFAULTS->{'random'}) ); open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', $val ); }; if ( ( $@ ) || ( ! defined $pid ) ) { $q_PRINT->enqueue( "Thread $tid failed open for job $job" ); $cleanup->(); return $DEFAULTS->{'ret_failure'}; } { lock %ACTIVE_PIDS; $ACTIVE_PIDS{'worker'}{$tid} = shared_clone ( { 'pid' => $pid, 'pid_start' => time } ); } waitpid( $pid, 0 ); my $pid_exit_status = ${^CHILD_ERROR_NATIVE}; if ( $pid_exit_status != 0 ) { $q_PRINT->enqueue( "- Thread $tid process $pid " . "exit $pid_exit_status" ); $cleanup->(); { lock %COMPLETE; $COMPLETE{'failure'}++; } return $DEFAULTS->{'ret_failure'}; } $q_PRINT->enqueue( "+ Thread $tid process $pid " . "exit $pid_exit_status" ); { lock %COMPLETE; $COMPLETE{'success'}++; } $cleanup->(); $DEFAULTS->{'ret_success'}; } ## end get_Information(). sub shutdown_engine { $q_PRINT->enqueue( "Engine shutdown process started" ); my $active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} ); while ( $active_count > 0 ) { { lock %ACTIVE_PIDS; foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) { if ( ( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) || ( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) ) { my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'}; my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'}; if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) { delete ( $ACTIVE_PIDS{'worker'}{$tid} ); } ## end if pid check. } else { delete ( $ACTIVE_PIDS{'worker'}{$tid} ); } ## end if defined. } ## end foreach active pid. } ## end lock block. usleep( 250_000 ); $active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} ); } ## end while active count. $f_TERM_TIMER = 1; usleep( 250_000 ); while ( $q_PRINT->pending() > 0 ) { usleep( 250_000 ); } $f_TERM_PRINT = 1; return $DEFAULTS->{'ret_success'}; } ## end shutdown_engine(). MAIN: { my $q_jobs = Thread::Queue->new(); for ( 1..$DEFAULTS->{'test_jobs'} ) { my $job = q{JOB-}.$_; $q_jobs->enqueue($job); } my $thr_pm = threads->create( \&print_manager )->detach(); my $thr_tm = threads->create( \&thread_timer )->detach(); my %work_queues; for ( 1..$DEFAULTS->{'max_threads'} ) { my $q_work = Thread::Queue->new(); my $thr = threads->create( \&thread_worker, $q_work ); $work_queues{$thr->tid()} = $q_work; } ## end for max_threads. while ( ! $f_TERM ) { usleep( 250_000 ); $q_PRINT->enqueue( "Active threads " . int( threads->list() ) ); my $tid = $q_IDLE->dequeue(); last if ($tid < 0); if ( $q_jobs->pending() > 0 ) { my $work = $q_jobs->dequeue_nb(); $work_queues{$tid}->enqueue( $work ); } else { $q_PRINT->enqueue( 'No More Jobs to Submit' ); $f_TERM = 1; } ## end if pending jobs. } ## end while ! f_TERM. $work_queues{$_}->enqueue(-1) foreach keys(%work_queues); $_->join() foreach threads->list(); } ## end MAIN. shutdown_engine(); print '*' x60, "\n"; print 'Success: ', $COMPLETE{'success'}, "\n"; print 'Failure: ', $COMPLETE{'failure'}, "\n"; print '*' x60, "\n"; print("Done\n"); exit $DEFAULTS->{'exit_success'};


Comment on Re^2: RFC: Threads, Pooling, Queues, Timers and More...
Download Code

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://881190]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others avoiding work at the Monastery: (14)
As of 2015-07-31 12:50 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    The top three priorities of my open tasks are (in descending order of likelihood to be worked on) ...









    Results (277 votes), past polls