Beefy Boxes and Bandwidth Generously Provided by pair Networks
Don't ask to ask, just ask
 
PerlMonks  

Re^2: How can I force thread switching?

by ajl52 (Novice)
on Sep 29, 2013 at 09:04 UTC ( #1056197=note: print w/ replies, xml ) Need Help??


in reply to Re: How can I force thread switching?
in thread How can I force thread switching?

Hi,

Trying to streamline my code to make it a standalone sample, I changed a bit the "alarm" criteria from $queuelen <= $throttle->pending() to a non-blocking semaphore decrement $throttle->down_nb().

It now works as expected without need for sleep(), nor yield().

However, my demo code shows a bug which is (apparently) not present in production code: one of the threads is silently terminated (without intentional error injection) and I can't find why.

Code example:

#!/usr/bin/perl # -*- tab-width: 4 -*- use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for (1 .. $multi) { push ( @thread , threads->create (\&processrefsthread, $_) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub syncIdle { # Check if any thread errored out my $abort = 0; for my $i (0..$#thread) { if ( !$thread[$i]->is_running() || $thread[$i]->is_joinable() ) { lock($screenaccess); print (STDERR 'ERROR: thread #' , 1+$i , ' encountered a problem while processing request +' , "\n" , $log[1+$i] , "\n" , 'Check the cause and eventually report a bug.' , "\n" ); $abort ||= 1; } } if ($abort) { endThreadedOperation(); print (STDERR "Flushing and aborting now ...\n"); print (STDERR 'The error message may have scrolled out due to +asynchronous operation. Check.', "\n"); exit(1); } while (0 < $dispatcher->pending()) { threads->yield(); # sleep(1); # Retry later } } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub logStart { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logsize = length($text); $logline = $attr . $text; $log[$where] = $logline; # For debugging lock($screenaccess); print (STDERR '++ ' , $text , "\n" ); } sub logFinal { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logline .= $attr . $text; lock($screenaccess); print (STDERR '-- ' , $logline , "\n" ); $logline = undef; $log[$where] = undef; } sub processrefsthread { my $logid = shift; logFinal($logid, "Thread #$logid created\n", ''); while (my $job = $dispatcher->dequeue()) { $throttle->up(); # Job removed from queue my $d = int(rand(5)); logStart ( $logid , "$logid processing $job (duration $d)" , '' ); # sleep($d); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } ####### # Inoculate a bug # # # if (1 == $logid) # { # logFinal($logid, "\nkilling thread #$logid", ''); # my $z = 0; # $z = 1/$z; # # threads->exit(); # } ####### logFinal($logid, ' ... Done', ''); } } sub queueProcessRequest { my ($job) = @_; # If the queue fills up, it may be caused by threads killed # by an error. In this case, we'll be blocked forever below. # Then let's have a look on the threads. # if ($queuelen <= $dispatcher->pending()) { if (!$throttle->down_nb()) { { lock($screenaccess); print STDERR "***** Queue full\n"; } # threads->yield(); # sleep(1); # Give a chance foreach my $t (@thread) { if ( !$t->is_running() || $t->is_joinable() ) { syncIdle(); # Diagnose and abort } } $throttle->down(); } $dispatcher->enqueue($job); return undef } initThreadedOperation(4); for my $i (0..50) { queueProcessRequest ($i); } logFinal(0, "Queueing completed!\n", ''); syncIdle(); endThreadedOperation();

When run as is, it reports that a (randomly selected) thread is already terminated, before trying to process even its first job.

If I wrote something wrong, it might happen in the production code

Note: to cause threads to error out, uncomment the lines in "Inoculate a bug" block


Comment on Re^2: How can I force thread switching?
Select or Download Code
Re^3: How can I force thread switching?
by BrowserUk (Pope) on Sep 29, 2013 at 19:15 UTC
    one of the threads is silently terminated (without intentional error injection) and I can't find why.

    Your first thread to run will always terminate immediately because you are pushing a job id of zero:

    for my $i ( 0 .. 50 ) { queueProcessRequest( $i ); }

    Which when it is received:

    while (my $job = $dispatcher->dequeue()) {

    Test false and terminates the loop.

    Aside from that, your code is very confused. I have no idea what syncIdle is meant to be doing, and removing it, along with various other chunks of code that seem to serve no good purpose, I got to this which runs perfectly:

    That said, limiting your queue to 4 elements to feed 4 threads means that your threads are running in lock-step, which pretty much defeats the purpose of using a queue.

    And using Thread::Semaphore as the mechanism for limiting the size of the queue is like hiring the Pinkerton's to manage the queue for the bathroom in the mornings. Not just overkill, but by forcing lock-step (synchronising your threads), throws away much of the gain from asynchronous processing.

    Here's how I would write the same program:

    #!/usr/bin/perl -lw use strict; use threads; use threads::shared; use threads::Q; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } our $T //= 4; my $Q = threads::Q->new( $T * 4 ); ## Q self-limits to 4*$T elements my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T; $Q->nq( $_ ) for 1 .. 50; $Q->nq( (undef) x $T ); $_->join for @threads;

    You can get threads::Q (my own, efficient, self-limiting queue implementation) from here.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Your first thread to run will always terminate immediately because you are pushing a job id of zero:

      Shame on me! Now you make the remark, it is obvious.

      Aside from that, your code is very confused.

      The actual code was simplified to caricature so that problems and bugs could occur rapidly.

      For instance, the queue length was made equal to the number of workers to reach saturation in a jiffie (have you noticed 1 * $multi which is totally silly).

      In the same line, job to be done was replaced by a number (forgetting that 0 = false, gasp!). In production code, it is a reference to a list containing various objects: strings and hashes. Since the hashes are rather big, I preferred to add a semaphore to slow down the dispatcher (it runs like a lightning and could stuff 50k requests in an instant) because I do not want to flood memory uselessly (to be memory-friendly). Of course, once the full process has warmed up, the dispatcher queues request only one at a time as the workers remove one job from the queue. But the queue is long enough (2-3 x workers) to avoid starving.

      The confused code may be a reference to my fiddling with is_running and is_joinable. This a (feeble?) attempt to protect against programming errors in the workers. I noticed that if a worker crashes I get error messages about unfinished threads when the main process terminates. The idea is to note something weird happened and to be able to print a message catching user attention because the original (Perl) error message may have scrolled away for a long time.

      When this condition is detected, syncIdle is launched to wait for active thread normal termination. syncIdle is also called by the dispatcher when it has finished queueing all requests to wait for request complete processing before doing something else.

      The situation is even more serious if all workers crashed. In this case, the queue will never be emptied. I need a way to regain control in the main thread (or somewhere else) otherwise I'm deadlocked.

      Since this is my first multi-thread Perl script, there may be a much better way to do it. Have you any suggestion?

      Concerning the logxxx, these are thread-safe (I hope) subs to write to the common screen. Every worker has a dedicated line on screen to log its activity. What was not shown is the role of the third argument $attr. It contains ANSI escape code to highlight the text in the second argument. It is not included in the text, so that the number of columns used on screen may be easily computed from the length of text (which is not supposed to contain zero-width Unicode characters or other multi-byte characters). This allows logAdd and logFinal to add their text at the tail of what is already on screen.

      Anyway, a big thank for the help.

      Your latest code shows you passed a reference to the queue as a thread argument. What is the advantage/difference compared to a :shared variable? What is the efficiency impact?

        The actual code was simplified to caricature so that problems and bugs could occur rapidly.

        Sorry. I should have said confusing to me rather than "confused". It was obviously artifacts of the reduction process that was leaving me in the dark as to the wider picture of your code.

        The confused code may be a reference to my fiddling with is_running and is_joinable. This a (feeble?) attempt to protect against programming errors in the workers.... Since this is my first multi-thread Perl script, there may be a much better way to do it. Have you any suggestion?

        The simplest approach to dealing with thread proc errors is to wrap them in block eval. Instead of:

        my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T;

        Use:

        sub threadProc { my $code = shift; for(1 .. 5 ){ my @rv = eval { $code->( @_ ); }; if( $@ ) { ## log errors if applicable ## next if retrying makes sense. ## last if not. tprint "$@; retrying"; } else { return @rv; ## return any return values if applicable. ## think about scalar/list/void context. } } } my @threads = map threads->new( \&threadProc, \&processrefsthread, $Q +), 1 .. $T;

        The retry loop is optional, but demonstrates the possibility. Adding that, plus a random divide-by-zero error in the actual thread proc:

        sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); unless( $d ) { 12345 / $d; } for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; }

        And you get:

        C:\test>1056140-buk Useless use of division (/) in void context at C:\test\1056140-buk.p [1] Starting [2] Starting [3] Starting [4] Starting [2] processing job 1 [1] processing job 2 [3] processing job 3 [4] processing job 4 [4] processing job 5 [4] processing job 6 [1] processing job 7 [3] processing job 8 [4] processing job 9 [4] Illegal division by zero at C:\test\1056140-buk.pl line 25. ; retrying [4] Starting [2] processing job 10 [4] processing job 11 [2] Illegal division by zero at C:\test\1056140-buk.pl line 25. ; retrying [2] Starting [2] processing job 12 [3] processing job 13 [2] processing job 14 [1] processing job 15 [1] Illegal division by zero at C:\test\1056140-buk.pl line 25. ; retrying [1] Starting ...

        Thus errors are trapped and diagnosed at source and recovery can be attempted without having to try and detect and divine reasoning remotely.

        Concerning the logxxx, these are thread-safe (I hope) subs to write to the common screen.

        A couple of thoughts:

        • Rather than passing an arbitrary thread id into your threads and then passing that through to your log subs, use threads->tid to query the system provided, monotonically increasing tid within the log subs.
        • In general, having in-place, scrolling messages on a screen that might be overwritten before they are noticed is historically a bad idea.

          Consider logging the verbose version of the messages to a file, and only very short status changes to the screen.

          Set and remember the highlight color of the most severe status for that job; and retain it even when subsequent status update the display. This way, even though the message the operator sees is a low priority 'done' message, the color (say red) tells them that something they perhaps missed occurred earlier and that they should investigate the log to find out what.

        Your latest code shows you passed a reference to the queue as a thread argument. What is the advantage/difference compared to a :shared variable? What is the efficiency impact?

        From an efficiency point of view, whether explicitly shared (via: my $Q :shared = ... ), or implicitly cloned via closure:

        my $Q - ...; sub thread { while( $Q->dq( ) ) ... } }

        or implicitly shared via argument passing as I showed, it makes little or no difference. In all three cases we are manipulating a reference to an explictly shared array (within the Q module), and the runtime effect is pretty much the same.

        The main advantages of using the implicitly shared reference via argument passing is the same as preferring to pass arguments to subroutines rather than have them access global variables. Ie. scoping; action at a distance; visibility etc.

        The advantages of my self-limiting queue implementation over your externally-limited implementation are:

        1. The logic is self-contained inside the Q implementation.

          Neither the producer no consumer code needs know or consider anything about the queue size growth, thus they are simpler and cleaner to program.

        2. Only one locking/syncing mechanism is required.

          This is the biggy!

          Inter-thread comms (ITC) mechanisms like queues already require a locking mechanism to protect their internals against simultaneous access.

          The underlying locking mechanism used by threads::shared and Thread::Semaphore (cond_vars) are very expensive. This is because every action -- set, clear, test or wait -- requires a call into the kernel. Calls into the kernel -- even if only to do an operation that doesn't require memory/instruction fencing or pipeline flushing, eg. testing a bit -- are very expensive because they involve ring 3 - ring 0 - ring 3 transitions which have been measured "to cost 1000-1500 cycles on most machines.".

          Using one locking mechanism is costly enough, but (in Perl/threads at least) unavoidable. But using 2 is far more than just twice as costly, because the two can interact badly to become very costly indeed. Understanding how this comes about is quite difficult to follow; very hard to explain; and impossible to demonstrate. Please either take my word for it; or do your own research :)

        The upshot is that I've never used Thread::Semaphore; I had to install it to run your code, and I will be uninstalling it immediately. Not because it is badly implemented -- from a cursory glance it looks just fine -- but because there is nothing it can do for me that I cannot do better with the underlying locking mechanism applied directly to the shared resource to be controlled.

        But if I use it, then I will have to use two separate locking mechanisms to control that single resource -- the use of threads::shared::lock() is unavoidable. And applying two locking mechanisms to a single resource is always a recipe for slow code at best; and dead/livelocks and priority inversions all too frequently.

        I highly recommend that you drop your use of Thread::Semaphore and adopt my threads::Q implementation. The latter is now very well tested and proven and simplifies your use-case beyond measure.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://1056197]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others scrutinizing the Monastery: (5)
As of 2014-09-18 02:53 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    How do you remember the number of days in each month?











    Results (105 votes), past polls