Beefy Boxes and Bandwidth Generously Provided by pair Networks
Syntactic Confectionery Delight
 
PerlMonks  

Re^2: RFC: Threads, Pooling, Queues, Timers and More...

by DeadPoet (Scribe)
on Jan 08, 2011 at 02:12 UTC ( #881190=note: print w/ replies, xml ) Need Help??


in reply to Re: RFC: Threads, Pooling, Queues, Timers and More...
in thread RFC: Threads, Pooling, Queues, Timers and More...

Totally understand F-Monk, some folks just don't like documented code. So, for those advanced programmers, like F-Monk, here you go. However, I think the point of documented code it to be explicit about what one is doing, and why they are doing it. Especially, when one is asking for comment and suggestions.

#!/usr/bin/perl use strict; use warnings; require 5.8.9; our $VERSION='2011-01-07'; use threads 1.39; use threads::shared; use Thread::Queue; use Time::HiRes qw( usleep time ); use IPC::Open3; use POSIX qw(:errno_h :sys_wait_h); use FileHandle; my $DEFAULTS = { 'exit_success' => 0, ## success exit. 'exit_failure' => 1, ## failure exit. 'ret_success' => 1, ## success return. 'ret_failure' => 0, ## failure return. 'debug' => 1, ## debug. 'random' => 5, ## Random amount of time to sleep. 'pid_timeout' => 3, ## Maximum number of seconds for a ## pid to run. 'max_threads' => 20, ## Maximum number of threads to ## maintain. 'test_jobs' => 100, ## Number of test jobs to run. }; my $l_DETACHING :shared; ## Prevent double detaching. my $f_TERM :shared = 0; ## Main termination flag. my $f_TERM_PRINT :shared = 0; ## Print Manager termination flag. my $f_TERM_TIMER :shared = 0; ## Timer termination flag. my $q_IDLE = Thread::Queue->new(); my $q_PRINT :shared; $q_PRINT = Thread::Queue->new(); my %ACTIVE_PIDS :shared; $ACTIVE_PIDS{'worker'} = shared_clone({}); my %COMPLETE :shared; $COMPLETE{'success'} = 0; $COMPLETE{'failure'} = 0; *STDOUT->autoflush(); *STDERR->autoflush(); $SIG{'INT'} = $SIG{'TERM'} = sub { print STDOUT q{>>> Terminating <<<}."\n"; $f_TERM = 1; $q_IDLE->insert(0, -1); }; sub print_manager :locked { while ( ( ! $f_TERM_PRINT ) ) { usleep( 500_000 ); if ( $q_PRINT->pending() > 0 ) { my $time = localtime time; for ( $q_PRINT->dequeue( ( $q_PRINT->pending() > 5 ) ? 5 : $q_PRINT->pending() ) ) { print STDOUT '[', $time, ']: ', $_, "\n"; } ## end for print. } ## end if pending. } ## end while. usleep( 500_000 ); if ( $q_PRINT->pending() > 0 ) { my $time = localtime time; for ( $q_PRINT->dequeue( $q_PRINT->pending() ) ) { print STDOUT '[', $time, ']: ', $_, "\n"; } ## end for print. } ## end if pending. threads->exit(); } ## end print_manager(). sub thread_worker { my ($q_work) = @_; my $tid = threads->tid(); do { $q_PRINT->enqueue( "Thread $tid --> IDLE" ); $q_IDLE->enqueue($tid); my $work = $q_work->dequeue(); if ( ( $work eq q{} ) || ( $work eq q{-1} ) ) { return $DEFAULTS->{'ret_failure'}; } $q_PRINT->enqueue( "Thread $tid <-- WORKING [$work]" ); get_information( $work, $tid ); } while (! $f_TERM); $q_PRINT->enqueue( "Thread $tid --> FINISHED" ); } ## end thread_worker(). sub thread_timer :locked { while ( ! $f_TERM_TIMER ) { usleep( 250_000 ); { lock %ACTIVE_PIDS; foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) { if ( ( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) || ( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) ) { next; } my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'}; my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'}; if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) { next; } my $runtime = ( time - $pid_start ); if ( $runtime > $DEFAULTS->{'pid_timeout'} ) { if ( is_pid_alive( $pid ) ) { $q_PRINT->enqueue( "Thread $tid has exceeded maximum " . "runtime for process $pid. Current " . "runtime: $runtime" ); kill( 9, $pid ); } ## end is_pid_alive. } ## end if timeout. } ## end foreach. } ## end lock block. } ## end while. { lock $l_DETACHING; if ( ! threads->is_detached() ) { threads->detach(); }; threads->exit(); } } ## end thread_timer(). sub is_pid_alive($) { my $pid = shift; my $msg = q{}; my $status = $DEFAULTS->{'ret_success'}; if ( kill(0, $pid) ) { ## Still alive. $msg = "Process $pid is still running"; $status = $DEFAULTS->{'ret_success'}; } elsif ( $! == EPERM ) { ## Changed UID. $msg = "Process $pid is changed UID"; $status = $DEFAULTS->{'ret_success'}; } elsif ( $! == ESRCH ) { ## Died or zombied. $msg = "Process $pid is died or zombied"; $status = $DEFAULTS->{'ret_failure'}; } else { ## Could not locate. $msg = "Process $pid search failure"; $status = $DEFAULTS->{'ret_failure'}; } if ( $DEFAULTS->{'debug'} ) { $q_PRINT->enqueue( "$msg"); } return $status; } ## end is_pid_alive. sub get_information($$) { my ( $job, $tid ) = @_; local ( *w_IN, *w_OUT, *w_ERR ); my $pid; my $cleanup = sub { w_IN->close if ( fileno(w_IN) ); w_OUT->close if ( fileno(w_OUT) ); w_ERR->close if ( fileno(w_ERR) ); { lock %ACTIVE_PIDS; $ACTIVE_PIDS{'worker'}{$tid}{'pid'} = -1; $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} = -1; } }; $pid = eval { my $val = int( rand($DEFAULTS->{'random'}) ); open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', $val ); }; if ( ( $@ ) || ( ! defined $pid ) ) { $q_PRINT->enqueue( "Thread $tid failed open for job $job" ); $cleanup->(); return $DEFAULTS->{'ret_failure'}; } { lock %ACTIVE_PIDS; $ACTIVE_PIDS{'worker'}{$tid} = shared_clone ( { 'pid' => $pid, 'pid_start' => time } ); } waitpid( $pid, 0 ); my $pid_exit_status = ${^CHILD_ERROR_NATIVE}; if ( $pid_exit_status != 0 ) { $q_PRINT->enqueue( "- Thread $tid process $pid " . "exit $pid_exit_status" ); $cleanup->(); { lock %COMPLETE; $COMPLETE{'failure'}++; } return $DEFAULTS->{'ret_failure'}; } $q_PRINT->enqueue( "+ Thread $tid process $pid " . "exit $pid_exit_status" ); { lock %COMPLETE; $COMPLETE{'success'}++; } $cleanup->(); $DEFAULTS->{'ret_success'}; } ## end get_Information(). sub shutdown_engine { $q_PRINT->enqueue( "Engine shutdown process started" ); my $active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} ); while ( $active_count > 0 ) { { lock %ACTIVE_PIDS; foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) { if ( ( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) || ( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) ) { my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'}; my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'}; if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) { delete ( $ACTIVE_PIDS{'worker'}{$tid} ); } ## end if pid check. } else { delete ( $ACTIVE_PIDS{'worker'}{$tid} ); } ## end if defined. } ## end foreach active pid. } ## end lock block. usleep( 250_000 ); $active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} ); } ## end while active count. $f_TERM_TIMER = 1; usleep( 250_000 ); while ( $q_PRINT->pending() > 0 ) { usleep( 250_000 ); } $f_TERM_PRINT = 1; return $DEFAULTS->{'ret_success'}; } ## end shutdown_engine(). MAIN: { my $q_jobs = Thread::Queue->new(); for ( 1..$DEFAULTS->{'test_jobs'} ) { my $job = q{JOB-}.$_; $q_jobs->enqueue($job); } my $thr_pm = threads->create( \&print_manager )->detach(); my $thr_tm = threads->create( \&thread_timer )->detach(); my %work_queues; for ( 1..$DEFAULTS->{'max_threads'} ) { my $q_work = Thread::Queue->new(); my $thr = threads->create( \&thread_worker, $q_work ); $work_queues{$thr->tid()} = $q_work; } ## end for max_threads. while ( ! $f_TERM ) { usleep( 250_000 ); $q_PRINT->enqueue( "Active threads " . int( threads->list() ) ); my $tid = $q_IDLE->dequeue(); last if ($tid < 0); if ( $q_jobs->pending() > 0 ) { my $work = $q_jobs->dequeue_nb(); $work_queues{$tid}->enqueue( $work ); } else { $q_PRINT->enqueue( 'No More Jobs to Submit' ); $f_TERM = 1; } ## end if pending jobs. } ## end while ! f_TERM. $work_queues{$_}->enqueue(-1) foreach keys(%work_queues); $_->join() foreach threads->list(); } ## end MAIN. shutdown_engine(); print '*' x60, "\n"; print 'Success: ', $COMPLETE{'success'}, "\n"; print 'Failure: ', $COMPLETE{'failure'}, "\n"; print '*' x60, "\n"; print("Done\n"); exit $DEFAULTS->{'exit_success'};


Comment on Re^2: RFC: Threads, Pooling, Queues, Timers and More...
Download Code

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://881190]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others lurking in the Monastery: (8)
As of 2014-12-20 04:47 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    Is guessing a good strategy for surviving in the IT business?





    Results (95 votes), past polls