Beefy Boxes and Bandwidth Generously Provided by pair Networks DiBona
Just another Perl shrine
 
PerlMonks  

Re^4: RFC: Using 'threads' common aspects

by DeadPoet (Scribe)
on Jan 08, 2011 at 23:49 UTC ( #881287=note: print w/ replies, xml ) Need Help??


in reply to Re^3: RFC: Using 'threads' common aspects
in thread RFC: Using 'threads' common aspects

Okay, let me back this off a bit and see if I can't get this correct. For now, I have dropped the timers and I "think" I have made the corrections that you have stated. However, I am not 100% and would appreciate your eyes.

If there are glaring errors, then please provide insight as to what I have done wrong.
#!/usr/bin/perl use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use Time::HiRes qw( usleep time ); use IPC::Open3; my $DEFAULTS = { 'exit_success' => 0, ## success exit. 'exit_failure' => 1, ## failure exit. 'ret_success' => 1, ## success return. 'ret_failure' => 0, ## failure return. 'random' => 10, ## Random amount of time to sleep. 'max_threads' => 10, ## Max # of threads to maintain. 'test_jobs' => 20, ## # of test jobs to run. }; my @q_JOBS :shared; ## shared jobs queue. my $f_TERM :shared = 0; ## main term flag. my $f_TERM_PRINT :shared = 0; ## print term flag. my $l_PRINT :shared; ## print lock my $q_IDLE :shared; ## idle queue. $q_IDLE = Thread::Queue->new(); my $q_PRINT :shared; ## print queue. $q_PRINT = Thread::Queue->new(); local $SIG{'INT'} = local $SIG{'TERM'} = sub { $q_PRINT->enqueue( 'Signaled TERMINATION' ); $f_TERM = 1; $q_IDLE->insert(0, -1); }; local $SIG{'KILL'} = sub { threads->exit(); }; sub main { ## populate a dummy list of jobs to work. ## this could be some server list, or whatever. @q_JOBS = map{ 'JOB-'.$_ }(1..$DEFAULTS->{'test_jobs'}); my $thr_pm = threads->create( \&print_manager )->detach(); for ( 1..$DEFAULTS->{'max_threads'} ) { my $thr_wk = threads->create( \&thread_worker ); } ## end for max_threads. while ( ! $f_TERM ) { usleep( 250_000 ); my $tid = $q_IDLE->dequeue(); last if ( $tid < 0 ); } $q_PRINT->enqueue( 'Shutdown on main started' ); shutdown_engine(); } ## end main. sub shutdown_engine { ## Centralized cleanup and shutdown. $_->join foreach ( threads->list() ); while( $q_PRINT->pending() > 0 ) { usleep( 250_000 ); } $f_TERM_PRINT = 1; exit $DEFAULTS->{'exit_success'}; } sub get_next_job { lock @q_JOBS; return shift @q_JOBS; } sub thread_worker { my $tid = threads->tid(); while ( ( $#q_JOBS > 0 ) && ( ! $f_TERM ) ) { $q_PRINT->enqueue( "Thread $tid is IDLE" ); $q_IDLE->enqueue($tid, 0); my $job = get_next_job(); do_work( $job, $tid ); ## do whatever work... } ## end $q_PRINT->enqueue( "Thread $tid is FINISHED" ); $q_IDLE->insert($tid, -1); return $DEFAULTS->{'ret_success'}; } ## end thread_worker. sub run_external_command { ## serves as a place for extensive ## external command where output ## and secondary processing may ## take place. my ( $job, $tid ) = @_; $q_PRINT->enqueue( "Thread $tid is running exteral job $job" ); local( *w_IN, *w_OUT, *w_ERR ); my $pid = open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', int( rand($ +DEFAULTS->{'random'} ) ) ); waitpid( $pid, 0 ); return $DEFAULTS->{'ret_success'}; } sub do_work { ## do_work could be a wrapper to whatever type ## of work needs to be accomplished. For now, ## I will just sleep. my ($job, $tid) = @_; $q_IDLE->enqueue($tid, 1); $q_PRINT->enqueue( "Thread $tid is WORKING [$job]" ); ## pretend it takes some time before running the ## external command -- i.e. mix shit up a bit. sleep( int( rand( 5 ) ) ); run_external_command( $job, $tid ); $q_PRINT->enqueue( "Thread $tid COMPLETED [$job]" ); return $DEFAULTS->{'ret_success'}; } ## end do_work. sub timestamp { return localtime time; } ## end timestamp. sub print_manager { lock $l_PRINT; while ( ! $f_TERM_PRINT ) { usleep(100_000); print STDOUT '[' . timestamp() . ']: ' . $q_PRINT->dequeue() . "\n"; } threads->exit(); } ## end print_manager. main(); exit $DEFAULTS->{'exit_success'}; __END__


Comment on Re^4: RFC: Using 'threads' common aspects
Download Code
Re^5: RFC: Using 'threads' common aspects
by BrowserUk (Pope) on Jan 09, 2011 at 16:36 UTC
    • Jobs queue/array:

      Okay. You took my point about your jobs queue only being accessed by a single thread and reverted that to being just an array rather than a queue. You also took my point about the fan of 1-per-worker queues, and did away with them. But now, as you've done away with the latter, it is necessary for the workers to access the former, so you've made the jobs array shared. Of course, that means that you need to lock the shared array before removing jobs from it, so you've added the get_next_job() subroutine that takes care of the locking. And what you've ended up doing is incompletely re-implementing Thread::Queue, which is just a shared array that does locking.

      What you could (should?) have done is just do away with the per-worker queues and retained the jobs queue. Then your workers get their jobs from that queue using dequeue in the normal way and get_next_job() is unnecessary.

    • Idle queue:

      In your thread subroutine you push the tid and zero each time around the work loop. And when the work loop ends (no more jobs), you push the tid and -1. And in the do_work() sub you push the tid and 1.

      Why? What code is reading and acting upon these messages?

      The only place where you read this queue is in the main() sub. After you've placed all the jobs in the array and started all the workers, you enter a loop that dequeues values from the idle queue. But, it dequeues one value at a time and labels it $tid, even though each time you enqueued a tid you also enqueued a magic number (-1,0 or 1). It then decides that if the number (referred to as $tid, but half the time, just one of the magic numbers), is less than 0, then it exits that loop.

      Lets say you queue up 4 jobs, and start 2 workers, and each worker processes 2 jobs. Then the contents of the idle queue--ignoring anything being removed for a moment--will be something like (the ordering is non-deterministic):

      tid. magic 1 0 Worker 1 first job in while loop 1 1 Worker 1 first job in do_work 2 0 Worker 2 first job in while loop 2 1 Worker 2 first job in do_work 1 0 Worker 1 second job in while loop 1 1 Worker 1 second job in do_work 2 0 Worker 2 second job in while loop 2 1 Worker 2 second job in do_work 1 -1 Worker 1 after exiting while loop 2 -1 Worker 2 after exiting while loop

      But, as mentioned above, that ordering is non-determanistic, so could also be:

      tid. magic 1 0 Worker 1 first job in while loop 1 1 Worker 1 first job in do_work 1 0 Worker 1 second job in while loop 1 1 Worker 1 second job in do_work 1 -1 Worker 1 after exiting while loop 2 0 Worker 2 first job in while loop 2 1 Worker 2 first job in do_work 2 0 Worker 2 second job in while loop 2 1 Worker 2 second job in do_work 2 -1 Worker 2 after exiting while loop
      tid. magic 2 0 Worker 2 first job in while loop 2 1 Worker 2 first job in do_work 2 0 Worker 2 second job in while loop 2 1 Worker 2 second job in do_work 2 -1 Worker 2 after exiting while loop 1 0 Worker 1 first job in while loop 1 1 Worker 1 first job in do_work 1 0 Worker 1 second job in while loop 1 1 Worker 1 second job in do_work 1 -1 Worker 1 after exiting while loop

      Or any of a dozen other variations. So what is it telling the main loop?

      Remember, that the loop in main will terminate the first time it sees a value less than zero. And if you look at the above possibilities, that will never actually indicate that all the threads have completed. Which I what I think was your intent. Even with only two threads and the most optimistic ordering, there will always be at least two values that will never be removed from that queue. In the worst case scenario, which will happen far more often that statistics alone would suggest, the while loop in main() will terminate when all the workers except one are still processing many jobs.

      What affect will the worse case scenario have upon your program?

      Actually, nothing. Because as soon as you exit the while loop in main(), you call shutdown_engine(), and the first thing it does is loop over all the thread objects and waits for them to join. Ie end.

      The net results is that the entire idle queue mechanism is not only totally broken, it is entirely redundant.

      Or at least it would be if you weren't detaching your threads which makes the joins redundant. But that also means that those return values you set up and return from your workers are also redundant.

    • Print manager:

      Why are you queueing all your messages to a separate thread in order to have them printed?

      Your answer will be something to the effect that you need to serialise the output from multiple threads to the single resource--the screen or file attached to STDOUT. And that is true. But do you need to use a separate thread and queue to do this?

      And the answer is no. It is a waste of resource and an additional level of redundant complexity. A simple print wrapper combined with a single shared variable to act as a mutex is all that is required. Eg;

      my $semSTDOUT : shared; sub tprint{ lock $semSTDOUT; print @_; } tprint "$tid: Some text";

      Simpler to write and maintain and far more flexible and convenient to use.

    • run_external_command()
      sub run_external_command { my ( $job, $tid ) = @_; $q_PRINT->enqueue( "Thread $tid is running exteral job $job" ); local( *w_IN, *w_OUT, *w_ERR ); my $pid = open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', int( rand +($DEFAULTS->{'random'} ) ) ); waitpid( $pid, 0 ); return $DEFAULTS->{'ret_success'}; }

      Why are you using open3? Instead of say, just system which does everything your code currently does.

      I appreciate that you were originally limiting the time the command ran for, which means you need a pid in order to use kill, but you can also get a pid from piped open, and that doesn't suffer the problems associated with open3. (Namely, that juggling two output streams either of which can block the process if they fill, is notoriously difficult to get right.)

    • timestamp()

      is never used.

    • local $SIG{'KILL'} = sub { threads->exit(); };

      Why? If all you are going to do is terminate, why not just let that happen?

    • Finally %DEFAULTS.

      You set up this hash named %DEFAULTS containing a bunch of named values.

      But, there is no mechanism for modifying them, so they aren't defaults, but rather constants. So why not use constant?

      In addition, none of those return values is ever assigned anywhere, much less checked or acted upon.

    So, here is how I would write a program to do what I believe you intend your program to do.

    Stylistically, it is written in my (infinitely preferable and superior :) style, but essentially does everything yours attempts to do. But it does correctly and in a simple, clear, easily understood and maintainable way.

    #! perl -slw use strict; use threads; use threads::shared; use Thread::Queue; use constant { RANDOM => 4, THREADS => 4, JOBS => 20, }; my $semSTD :shared; sub tprint { my $tid = threads->tid; lock $semSTD; print "[$tid] ", @_; } my $die_early :shared = 0; $SIG{ INT } = sub { tprint "Early termination requested"; $die_early = 1; }; sub worker { tprint "worker started"; my( $Q ) = @_; while( !$die_early and defined( my $job = $Q->dequeue ) ) { tprint "processing job:$job"; my $pid = open my $PIPE, '-|', "sleep " . int( rand RANDOM ) o +r die $!; tprint "waiting for pid: $pid"; waitpid $pid, 0; tprint "pid: $pid done"; } tprint "Worker ending"; return 1; } my $Q = new Thread::Queue; $Q->enqueue( map "JOB-$_", 1 .. JOBS ); $Q->enqueue( (undef) x THREADS ); tprint "Queue populated"; my @threads = map threads->new( \&worker, $Q ), 1 .. THREADS; tprint "Workers started; waiting..."; $_->join for @threads; print "Program complete"; __END__ C:\test>881217 [0] Queue populated [1] worker started [1] processing job:JOB-1 [1] waiting for pid: 1740 [2] worker started [2] processing job:JOB-2 [1] waiting for pid: 1056 [2] pid: 2684 done [2] processing job:JOB-4 [1] waiting for pid: 2116 [3] worker started [3] processing job:JOB-6 [3] waiting for pid: 3244 [0] Workers started; waiting... [4] worker started [4] processing job:JOB-7 ... [3] pid: 1112 done [3] processing job:JOB-20 [3] waiting for pid: 4048 [2] pid: 1728 done [1] pid: 3432 done [2] Worker ending [1] Worker ending [4] pid: 3924 done [4] Worker ending [3] pid: 4048 done [3] Worker ending Program complete

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Wow, a lot to digest....

      Let me try to address a couple of your questions.

      Let's start with "Why open3?"

      Open3 provides me an easily method for splitting IO communication. Most of the stuff I deal with is over SecureShell and frankly I like the way the output is available via the appropriate handles. Moreover, the need to time out a process is highly needed no matter if the command is local or remote. Take the example of the 'df' command and HP-UX, what happens if one run the command and it hits a hung NFS file system? The entire command is now hung thus the thread, and eventually the program. The point is not that one could exclude NFS from the command, but more to the point that the unexpected happens when dealing with 3000 server. Just because one terminates the thread or breaks the pipe does not mean the process will not remain. SecureShell is such a great example of this.

      The DEFAULTS, yes I can flag it as READONLY or CONSTANT, it is just a place holder for program defaults. However, if I expanded such a program to leverage getopts to specify maximum, minimum, etc... values the constant/readonly aspect becomes a moot point.

      Central printing, just an example... Yes, I totally understand that each thread can manage its own printing aspect with a simple lock variable and that is without a doubt the more efficient manner. Thus, should have been my first choice.

      Timestamp(): This is used by the print_manager. In my business, time stamp is everything, but in this example just an indicator of program progress.

      sub print_manager { lock $l_PRINT; while ( ! $f_TERM_PRINT ) { usleep(100_000); print STDOUT '[' . timestamp() . ']: ' . $q_PRINT->dequeue() . "\n"; } threads->exit(); } ## end print_manager.
      For the remaining topics, I need to digest and think about everything. I think I see where you are going, but I also think you may be missing the importance of ensuring that bad process do not remain in the process table.

      Once again, I must give thanks for your time and thoughts.

      --Poet
        Let's start with "Why open3?" Open3 provides me an easily method for splitting IO communication. Most of the stuff I deal with is over SecureShell and frankly I like the way the output is available via the appropriate handles. Moreover, the need to time out a process is highly needed no matter if the command is local or remote. Take the example of the 'df' command and HP-UX, what happens if one run the command and it hits a hung NFS file system?

        None of this was mentioned in either of your two threads. You offered this as an RFC:tutorial, not a chopped down version of a real application with an onerous set of very specific (and till now, unmentioned) technical requirements.

        I hope you realise that I cannot read your mind.

        I also think you may be missing the importance of ensuring that bad process do not remain in the process table.

        In my sample, I only attempted to meet the functionality of your latest code. Your latest code dropped the timeout and kill the process part, so I didn't implement it.

        But adding that functionality back to my sample code would require maybe 5 more lines. (For *nix, maybe 10 for Win32.)


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
      local $SIG{'KILL'} = sub { threads->exit(); };

      Why? If all you are going to do is terminate, why not just let that happen?

      Well, it will "just happen". SIGKILL can't be caught, blocked or ignored. The process will simply be forced to exit when a SIGKILL is sent, the sub will never be invoked. Unless, of course, you run that code on a very strange non-POSIX system with a different signals implementation.

      Alexander

      --
      Today I will gladly share my knowledge and experience, for there are no sweeter words than "I told you so". ;-)
        SIGKILL can't be caught, blocked or ignored.

        All of which makes you wonder why it is a signal at all.

        Unless, you run that code on a non-POSIX system

        Well that's a possibility. But "non-POSIX" doesn't equate to "very strange".


        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://881287]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others studying the Monastery: (7)
As of 2014-04-18 01:18 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    April first is:







    Results (460 votes), past polls