Beefy Boxes and Bandwidth Generously Provided by pair Networks
Clear questions and runnable code
get the best and fastest answer
 
PerlMonks  

RFC: Using 'threads' common aspects

by DeadPoet (Scribe)
on Jan 08, 2011 at 12:40 UTC ( #881217=perlmeditation: print w/ replies, xml ) Need Help??

There is copious amounts of documentation scattered across many sites related Perl 'threads'. However, I have not come across any that really pull together a composite example which illustrates the key aspects needed for a typical system administration-style program. The program listed below attempts to address the following common aspect of using Perl threads:

* Creating and Maintaining a thread pool.

* Thread signal handlers.

* Using shared memory and thread queues.

* Various levels of locking such as variable and subroutine.

* The use of high resolution thread timers for external process timeout and CPU reduction.

* Terminating external processes without terminating the thread stack.

* The use of IPC::Open3 to execute external processes--/bin/sleep in this example.

* Centralized printing by way of a print manager thread.

* Graceful cleanup and shutdown of the program.

What this program is...

This program is full functional and functions properly as is. That is to say, I obtained obtained the expected results.

This program is intended to illustrate commonly needed/used items when using Perl threads.

What this program is not...

This program is not an attempt to have anyone code for me. However, if there is a coding oversight or a better way of doing something,then your suggestions are valued.

This program is not a critique on coding style, or documentation style--this is mainly an uncommented write-up.

#!/usr/bin/perl use strict; use warnings; our $VERSION='2011-01-08'; use threads 1.39; use threads::shared; use Thread::Queue; use Time::HiRes qw( usleep time ); use IPC::Open3; use POSIX qw(:errno_h :sys_wait_h); use FileHandle; my $DEFAULTS = { 'exit_success' => 0, ## success exit. 'exit_failure' => 1, ## failure exit. 'ret_success' => 1, ## success return. 'ret_failure' => 0, ## failure return. 'debug' => 1, ## debug. 'random' => 5, ## Random amount of time to sleep. 'pid_timeout' => 3, ## Maximum number of seconds for a ## pid to run. 'max_threads' => 10, ## Maximum number of threads to ## maintain. 'test_jobs' => 20, ## Number of test jobs to run. }; my $l_DETACHING :shared; ## Prevent double detaching. my $f_TERM :shared = 0; ## Main termination flag. my $f_TERM_PRINT :shared = 0; ## Print Manager termination flag. my $f_TERM_TIMER :shared = 0; ## Timer termination flag. my $q_IDLE = Thread::Queue->new(); my $q_PRINT :shared; $q_PRINT = Thread::Queue->new(); my %ACTIVE_PIDS :shared; $ACTIVE_PIDS{'worker'} = shared_clone({}); my %COMPLETE :shared; $COMPLETE{'success'} = 0; $COMPLETE{'failure'} = 0; *STDOUT->autoflush(); *STDERR->autoflush(); local $SIG{'INT'} = local $SIG{'TERM'} = sub { print STDOUT q{>>> Terminating <<<}."\n"; $f_TERM = 1; $q_IDLE->insert(0, -1); }; local $SIG{'KILL'} = sub { threads->exit(); }; sub print_manager :locked { while ( ( ! $f_TERM_PRINT ) ) { usleep( 500_000 ); if ( $q_PRINT->pending() > 0 ) { my $time = localtime time; for ( $q_PRINT->dequeue( ( $q_PRINT->pending() > 5 ) ? 5 : $q_PRINT->pending() ) ) { print STDOUT '[', $time, ']: ', $_, "\n"; } ## end for print. } ## end if pending. } ## end while. usleep( 500_000 ); if ( $q_PRINT->pending() > 0 ) { my $time = localtime time; for ( $q_PRINT->dequeue( $q_PRINT->pending() ) ) { print STDOUT '[', $time, ']: ', $_, "\n"; } ## end for print. } ## end if pending. threads->exit(); return $DEFAULTS->{'ret_success'}; } ## end print_manager(). sub thread_worker { my ($q_work) = @_; my $tid = threads->tid(); do { $q_PRINT->enqueue( "Thread $tid --> IDLE" ); $q_IDLE->enqueue($tid); my $work = $q_work->dequeue(); if ( ( $work eq q{} ) || ( $work eq q{-1} ) ) { return $DEFAULTS->{'ret_failure'}; } $q_PRINT->enqueue( "Thread $tid <-- WORKING [$work]" ); get_information( $work, $tid ); } while (! $f_TERM); $q_PRINT->enqueue( "Thread $tid --> FINISHED" ); return $DEFAULTS->{'ret_success'}; } ## end thread_worker(). sub thread_timer :locked { while ( ! $f_TERM_TIMER ) { usleep( 250_000 ); { lock %ACTIVE_PIDS; foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) { if ( ( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) || ( ! defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) ) { next; } my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'}; my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'}; if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) { next; } my $runtime = ( time - $pid_start ); if ( $runtime > $DEFAULTS->{'pid_timeout'} ) { if ( is_pid_alive( $pid ) ) { $q_PRINT->enqueue( "Thread $tid has exceeded maximum " . "runtime for process $pid. Current " . "runtime: $runtime" ); kill( 9, $pid ); } ## end is_pid_alive. } ## end if timeout. } ## end foreach. } ## end lock block. } ## end while. { lock $l_DETACHING; if ( ! threads->is_detached() ) { threads->detach(); }; threads->exit(); } return $DEFAULTS->{'ret_succes'}; } ## end thread_timer(). sub is_pid_alive { my $pid = shift; my $msg = q{}; my $status = $DEFAULTS->{'ret_success'}; if ( kill(0, $pid) ) { ## Still alive. $msg = "Process $pid is still running"; $status = $DEFAULTS->{'ret_success'}; } elsif ( $! == EPERM ) { ## Changed UID. $msg = "Process $pid is changed UID"; $status = $DEFAULTS->{'ret_success'}; } elsif ( $! == ESRCH ) { ## Died or zombied. $msg = "Process $pid is died or zombied"; $status = $DEFAULTS->{'ret_failure'}; } else { ## Could not locate. $msg = "Process $pid search failure"; $status = $DEFAULTS->{'ret_failure'}; } if ( $DEFAULTS->{'debug'} ) { $q_PRINT->enqueue( "$msg"); } return $status; } ## end is_pid_alive. sub get_information { my ( $job, $tid ) = @_; local ( *w_IN, *w_OUT, *w_ERR ); my $pid; my $cleanup = sub { w_IN->close if ( fileno(w_IN) ); w_OUT->close if ( fileno(w_OUT) ); w_ERR->close if ( fileno(w_ERR) ); { lock %ACTIVE_PIDS; $ACTIVE_PIDS{'worker'}{$tid}{'pid'} = -1; $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} = -1; } }; $pid = eval { my $val = int( rand($DEFAULTS->{'random'}) ); open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', $val ); }; if ( ( $@ ) || ( ! defined $pid ) ) { $q_PRINT->enqueue( "Thread $tid failed open for job $job" ); $cleanup->(); return $DEFAULTS->{'ret_failure'}; } { lock %ACTIVE_PIDS; $ACTIVE_PIDS{'worker'}{$tid} = shared_clone ( { 'pid' => $pid, 'pid_start' => time } ); } waitpid( $pid, 0 ); my $pid_exit_status = ${^CHILD_ERROR_NATIVE}; if ( $pid_exit_status != 0 ) { $q_PRINT->enqueue( "- Thread $tid process $pid " . "exit $pid_exit_status" ); $cleanup->(); { lock %COMPLETE; $COMPLETE{'failure'}++; } return $DEFAULTS->{'ret_failure'}; } $q_PRINT->enqueue( "+ Thread $tid process $pid " . "exit $pid_exit_status" ); { lock %COMPLETE; $COMPLETE{'success'}++; } $cleanup->(); return $DEFAULTS->{'ret_success'}; } ## end get_Information(). sub shutdown_engine { $q_PRINT->enqueue( "Engine shutdown process started" ); my $active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} ); while ( $active_count > 0 ) { { lock %ACTIVE_PIDS; foreach my $tid ( keys %{$ACTIVE_PIDS{'worker'}} ) { if ( ( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid'} ) || ( defined $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'} ) ) { my $pid = $ACTIVE_PIDS{'worker'}{$tid}{'pid'}; my $pid_start = $ACTIVE_PIDS{'worker'}{$tid}{'pid_start'}; if ( ( $pid < 0 ) || ( $pid_start < 0 ) ) { delete ( $ACTIVE_PIDS{'worker'}{$tid} ); } ## end if pid check. } else { delete ( $ACTIVE_PIDS{'worker'}{$tid} ); } ## end if defined. } ## end foreach active pid. } ## end lock block. usleep( 250_000 ); $active_count = int ( keys %{$ACTIVE_PIDS{'worker'}} ); } ## end while active count. $f_TERM_TIMER = 1; usleep( 250_000 ); while ( $q_PRINT->pending() > 0 ) { usleep( 250_000 ); } $f_TERM_PRINT = 1; return $DEFAULTS->{'ret_success'}; } ## end shutdown_engine(). MAIN: { my $q_jobs = Thread::Queue->new(); for ( 1..$DEFAULTS->{'test_jobs'} ) { my $job = q{JOB-}.$_; $q_jobs->enqueue($job); } my $thr_pm = threads->create( \&print_manager )->detach(); my $thr_tm = threads->create( \&thread_timer )->detach(); my %work_queues; for ( 1..$DEFAULTS->{'max_threads'} ) { my $q_work = Thread::Queue->new(); my $thr = threads->create( \&thread_worker, $q_work ); $work_queues{$thr->tid()} = $q_work; } ## end for max_threads. while ( ! $f_TERM ) { usleep( 250_000 ); $q_PRINT->enqueue( "Active threads " . int( threads->list() ) ); my $tid = $q_IDLE->dequeue(); last if ($tid < 0); if ( $q_jobs->pending() > 0 ) { my $work = $q_jobs->dequeue_nb(); $work_queues{$tid}->enqueue( $work ); } else { $q_PRINT->enqueue( 'No More Jobs to Submit' ); $f_TERM = 1; } ## end if pending jobs. } ## end while ! f_TERM. $work_queues{$_}->enqueue(-1) foreach keys(%work_queues); $_->join() foreach threads->list(); } ## end MAIN. shutdown_engine(); print '*' x60, "\n"; print 'Success: ', $COMPLETE{'success'}, "\n"; print 'Failure: ', $COMPLETE{'failure'}, "\n"; print '*' x60, "\n"; print("Done\n"); exit $DEFAULTS->{'exit_success'};

Comment on RFC: Using 'threads' common aspects
Download Code
Re: RFC: Using 'threads' common aspects
by Anonymous Monk on Jan 08, 2011 at 12:55 UTC
      Fair enough, easy to implement, and can provide an additional level of variable restriction.
Re: RFC: Using 'threads' common aspects
by BrowserUk (Pope) on Jan 08, 2011 at 14:16 UTC

    A couple of questions.

    • What do you think you are achieving by using the :locked attribute?
    • Do you realise that what you've implemented is not a thread-pool?

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      I am presuming that you are referring to the independent threads that manage the print_manager and the thread_timer aspect of the program. Yes, those threads are outside of the pool and are started near the top of MAIN.
      my $thr_pm = threads->create( \&print_manager )->detach(); my $thr_tm = threads->create( \&thread_timer )->detach();
      However, the worker aspect of the program operates from a pool of threads, which are reused and defined by:
      for ( 1..$DEFAULTS->{'max_threads'} ) { my $q_work = Thread::Queue->new(); my $thr = threads->create( \&thread_worker, $q_work ); $work_queues{$thr->tid()} = $q_work; } ## end for max_threads.
      I am expecting the ':locked' attribute to ensure that no other thread, intentional or otherwise, will access the subroutine while the active thread has control. Do you read otherwise? If so, how?
        I am expecting the ':locked' attribute to ensure that no other thread, intentional or otherwise, will access the subroutine while the active thread has control.

        If you read the threads documentation, the :locked attribute isn't mentioned because it does nothing. Quite literally nothing. It is a noop.

        It is non-functioning leftover from perl5005 threads that has never had any affect on iThreads.

        I am presuming that you are referring to the independent threads that manage the print_manager and the thread_timer aspect of the program.

        No. Having a couple of other threads outside of the worker pool isn't a problem. That's quite normal.

        The problem is that you manage each of the threads in your 'pool' as individual entities. Ie. Each has a separate queue. Which means that your dispatcher has to allocate and coordinate work to each worker individually. The result of which is the excessive complexity of your implementation. With a traditional thread pool, all the threads are identical and often anonymous. And they get their work from a single work queue.

        Think of this as analogous to the baggage carousels in airports. The bags are loaded onto the belt back-of-house, and the 'workers', passengers in this scenario, collect the bags from the carousel entirely independently of each other, and of the loading.

        What you've done is to replace the carousel with a fan of one-to-one conveyors, and placed a dispatcher between the inbound conveyor and that fan. He has to take the bags off the inbound conveyor, read the labels, and decide which of the fan of conveyors he needs to place each bag on for it to get to the appropriate customer.

        You've added huge complexity--the fan of one-customer-at-a-time, one-bag-at-a-time conveyors; and created a bottle-neck--the dispatcher--upon whom all the customers have to wait. Not to mention all the tagging, control and coordination problems created by the need to route each customer to the right conveyor and each bag to the right customer.

        In the process, you've made some very strange choices.

        For example, your 'jobs' queue. You create a shared array, onto which you push all your jobs en-mass. You then pull them off that queue and feed them onto the worker's individual queues. But it is the same thread that puts them on the jobs queue as takes them off. So there was no need for a shared data-structure--with all the size and performance penalties it entails. A simple array would have sufficed.

        And the rest of your code is full of similar anomalies.

        I'm trying very hard not to be negative of someone trying to do what I have declined to do, but the truth is that as a tutorial, your code is actually worse than no tutorial, because it teaches a lot of bad practices. And once out there, they become very hard to unlearn, and the time-bombs they will create become very hard to diffuse.

        Given the (unwritten, despite your original claim to "solid documentation") 'spec' of your 400+ line program, its function (such as it is) can be easily and far, far more clearly achieved with 30 or 40 lines, of cleaner, clearer and more easily maintained code. It is typical of code written to "demonstrate some functionality" rather than solve a problem. It is also typical of a program written by someone who has yet to try and use iThreads in a real application.

        Sorry.


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: RFC: Using 'threads' common aspects
by sundialsvc4 (Monsignor) on Jan 13, 2011 at 01:40 UTC

    It’s pretty much “a given” that, when you want to have a pool of worker-threads doing anything-at-all, there should be a configurable number of identical threads, each one reading from a single queue of requests and sending results back to a single queue of completed work for downstream post-processing.

    What this allows you to set, is what IBM called the “multiprogramming level” (MPL), which is simply “the maximum number of requests that the system will attempt to service simultaneously.”   If requests pile up in the queue, they will eventually be served by “some worker... any one will do.”   If the queue runs dry, “one or more workers” will simply be asleep, waiting for the phone to ring.

    If you have several distinctly-different classes of requests, consider having a separate queue for each class.   Have the workers wait for a message to arrive in any of the queues that they are interested in, and implement some kind of queue-preference mechanism.   But, keep it just as simple as you can.   Don’t add gratuitous complexity to your design ...

      For the record:

      what IBM called the “multiprogramming level” (MPL),

      has exactly nothing to do with threading, identical nor otherwise. Nor does it have anything to do with “the maximum number of requests that the system will attempt to service simultaneously.”.

      It has to do with memory management in a VM OS. Nowadays called a hypervisor though it long pre-dates that term.

      Don’t add gratuitous complexity to your design ...

      Hah! And you're the guy that offered this sage advice.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Oh, come now, mine very-public opponent (and quite against my will) ... sure it does.

        The multiprogramming level, whatever you call it, is “the (maximum...) amount of simultaneous activity that the program in question will attempt.”   And that is, the number of threads in the pool.   Whether the system has 10 requests to do or 100, if the MPL is set to 10 (by whatever means), there are 10 threads working and there are 90 requests in the queue.   This avoids having the system overcommit itself, overload virtual memory or some other resource, and smash into the bloody wall of exponentially-degrading completion times.   Throughput (as perceived by the system’s clients) is controlled by the queue-selection algorithms, but the system will not smash its own face under heavy load.

        My other advice is simply, model and measure the thing as best you can and figure out what sort of workload-management calisthenics these data tell you is actually required.   It is very easy to build a system that is truly more complex than it need be, and that, for being so complex, is actually less reliable.   I’ve done it; so have many others.   It’s just like biology:   “in a perfectly-designed experiment, under the most carefully controlled conditions, the little mouse will do as he damn well pleases...”

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlmeditation [id://881217]
Approved by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others examining the Monastery: (12)
As of 2014-07-29 15:57 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    My favorite superfluous repetitious redundant duplicative phrase is:









    Results (220 votes), past polls