Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl: the Markov chain saw
 
PerlMonks  

Re^3: How can I force thread switching?

by BrowserUk (Pope)
on Sep 29, 2013 at 19:15 UTC ( #1056256=note: print w/ replies, xml ) Need Help??


in reply to Re^2: How can I force thread switching?
in thread How can I force thread switching?

one of the threads is silently terminated (without intentional error injection) and I can't find why.

Your first thread to run will always terminate immediately because you are pushing a job id of zero:

for my $i ( 0 .. 50 ) { queueProcessRequest( $i ); }

Which when it is received:

while (my $job = $dispatcher->dequeue()) {

Test false and terminates the loop.

Aside from that, your code is very confused. I have no idea what syncIdle is meant to be doing, and removing it, along with various other chunks of code that seem to serve no good purpose, I got to this which runs perfectly:

#!/usr/bin/perl -lw use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for( 1 .. $multi ) { push( @thread, threads->create( \&processrefsthread, $_ ) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub processrefsthread { tprint 'Starting'; while (my $job = $dispatcher->dequeue()) { tprint "processing job $job"; $throttle->up(); # Job removed from queue my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } sub queueProcessRequest { my ($job) = @_; $dispatcher->enqueue($job); $throttle->down(); return undef } initThreadedOperation( 4 ); for my $i ( 1 .. 50 ) { tprint "Qing job $i"; queueProcessRequest( $i ); } endThreadedOperation();

That said, limiting your queue to 4 elements to feed 4 threads means that your threads are running in lock-step, which pretty much defeats the purpose of using a queue.

And using Thread::Semaphore as the mechanism for limiting the size of the queue is like hiring the Pinkerton's to manage the queue for the bathroom in the mornings. Not just overkill, but by forcing lock-step (synchronising your threads), throws away much of the gain from asynchronous processing.

Here's how I would write the same program:

#!/usr/bin/perl -lw use strict; use threads; use threads::shared; use threads::Q; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } our $T //= 4; my $Q = threads::Q->new( $T * 4 ); ## Q self-limits to 4*$T elements my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T; $Q->nq( $_ ) for 1 .. 50; $Q->nq( (undef) x $T ); $_->join for @threads;

You can get threads::Q (my own, efficient, self-limiting queue implementation) from here.


With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.


Comment on Re^3: How can I force thread switching?
Select or Download Code
Re^4: How can I force thread switching?
by ajl52 (Novice) on Sep 30, 2013 at 08:54 UTC

    Your first thread to run will always terminate immediately because you are pushing a job id of zero:

    Shame on me! Now you make the remark, it is obvious.

    Aside from that, your code is very confused.

    The actual code was simplified to caricature so that problems and bugs could occur rapidly.

    For instance, the queue length was made equal to the number of workers to reach saturation in a jiffie (have you noticed 1 * $multi which is totally silly).

    In the same line, job to be done was replaced by a number (forgetting that 0 = false, gasp!). In production code, it is a reference to a list containing various objects: strings and hashes. Since the hashes are rather big, I preferred to add a semaphore to slow down the dispatcher (it runs like a lightning and could stuff 50k requests in an instant) because I do not want to flood memory uselessly (to be memory-friendly). Of course, once the full process has warmed up, the dispatcher queues request only one at a time as the workers remove one job from the queue. But the queue is long enough (2-3 x workers) to avoid starving.

    The confused code may be a reference to my fiddling with is_running and is_joinable. This a (feeble?) attempt to protect against programming errors in the workers. I noticed that if a worker crashes I get error messages about unfinished threads when the main process terminates. The idea is to note something weird happened and to be able to print a message catching user attention because the original (Perl) error message may have scrolled away for a long time.

    When this condition is detected, syncIdle is launched to wait for active thread normal termination. syncIdle is also called by the dispatcher when it has finished queueing all requests to wait for request complete processing before doing something else.

    The situation is even more serious if all workers crashed. In this case, the queue will never be emptied. I need a way to regain control in the main thread (or somewhere else) otherwise I'm deadlocked.

    Since this is my first multi-thread Perl script, there may be a much better way to do it. Have you any suggestion?

    Concerning the logxxx, these are thread-safe (I hope) subs to write to the common screen. Every worker has a dedicated line on screen to log its activity. What was not shown is the role of the third argument $attr. It contains ANSI escape code to highlight the text in the second argument. It is not included in the text, so that the number of columns used on screen may be easily computed from the length of text (which is not supposed to contain zero-width Unicode characters or other multi-byte characters). This allows logAdd and logFinal to add their text at the tail of what is already on screen.

    Anyway, a big thank for the help.

    Your latest code shows you passed a reference to the queue as a thread argument. What is the advantage/difference compared to a :shared variable? What is the efficiency impact?

      The actual code was simplified to caricature so that problems and bugs could occur rapidly.

      Sorry. I should have said confusing to me rather than "confused". It was obviously artifacts of the reduction process that was leaving me in the dark as to the wider picture of your code.

      The confused code may be a reference to my fiddling with is_running and is_joinable. This a (feeble?) attempt to protect against programming errors in the workers.... Since this is my first multi-thread Perl script, there may be a much better way to do it. Have you any suggestion?

      The simplest approach to dealing with thread proc errors is to wrap them in block eval. Instead of:

      my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T;

      Use:

      sub threadProc { my $code = shift; for(1 .. 5 ){ my @rv = eval { $code->( @_ ); }; if( $@ ) { ## log errors if applicable ## next if retrying makes sense. ## last if not. tprint "$@; retrying"; } else { return @rv; ## return any return values if applicable. ## think about scalar/list/void context. } } } my @threads = map threads->new( \&threadProc, \&processrefsthread, $Q +), 1 .. $T;

      The retry loop is optional, but demonstrates the possibility. Adding that, plus a random divide-by-zero error in the actual thread proc:

      sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); unless( $d ) { 12345 / $d; } for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; }

      And you get:

      C:\test>1056140-buk Useless use of division (/) in void context at C:\test\1056140-buk.p [1] Starting [2] Starting [3] Starting [4] Starting [2] processing job 1 [1] processing job 2 [3] processing job 3 [4] processing job 4 [4] processing job 5 [4] processing job 6 [1] processing job 7 [3] processing job 8 [4] processing job 9 [4] Illegal division by zero at C:\test\1056140-buk.pl line 25. ; retrying [4] Starting [2] processing job 10 [4] processing job 11 [2] Illegal division by zero at C:\test\1056140-buk.pl line 25. ; retrying [2] Starting [2] processing job 12 [3] processing job 13 [2] processing job 14 [1] processing job 15 [1] Illegal division by zero at C:\test\1056140-buk.pl line 25. ; retrying [1] Starting ...

      Thus errors are trapped and diagnosed at source and recovery can be attempted without having to try and detect and divine reasoning remotely.

      Concerning the logxxx, these are thread-safe (I hope) subs to write to the common screen.

      A couple of thoughts:

      • Rather than passing an arbitrary thread id into your threads and then passing that through to your log subs, use threads->tid to query the system provided, monotonically increasing tid within the log subs.
      • In general, having in-place, scrolling messages on a screen that might be overwritten before they are noticed is historically a bad idea.

        Consider logging the verbose version of the messages to a file, and only very short status changes to the screen.

        Set and remember the highlight color of the most severe status for that job; and retain it even when subsequent status update the display. This way, even though the message the operator sees is a low priority 'done' message, the color (say red) tells them that something they perhaps missed occurred earlier and that they should investigate the log to find out what.

      Your latest code shows you passed a reference to the queue as a thread argument. What is the advantage/difference compared to a :shared variable? What is the efficiency impact?

      From an efficiency point of view, whether explicitly shared (via: my $Q :shared = ... ), or implicitly cloned via closure:

      my $Q - ...; sub thread { while( $Q->dq( ) ) ... } }

      or implicitly shared via argument passing as I showed, it makes little or no difference. In all three cases we are manipulating a reference to an explictly shared array (within the Q module), and the runtime effect is pretty much the same.

      The main advantages of using the implicitly shared reference via argument passing is the same as preferring to pass arguments to subroutines rather than have them access global variables. Ie. scoping; action at a distance; visibility etc.

      The advantages of my self-limiting queue implementation over your externally-limited implementation are:

      1. The logic is self-contained inside the Q implementation.

        Neither the producer no consumer code needs know or consider anything about the queue size growth, thus they are simpler and cleaner to program.

      2. Only one locking/syncing mechanism is required.

        This is the biggy!

        Inter-thread comms (ITC) mechanisms like queues already require a locking mechanism to protect their internals against simultaneous access.

        The underlying locking mechanism used by threads::shared and Thread::Semaphore (cond_vars) are very expensive. This is because every action -- set, clear, test or wait -- requires a call into the kernel. Calls into the kernel -- even if only to do an operation that doesn't require memory/instruction fencing or pipeline flushing, eg. testing a bit -- are very expensive because they involve ring 3 - ring 0 - ring 3 transitions which have been measured "to cost 1000-1500 cycles on most machines.".

        Using one locking mechanism is costly enough, but (in Perl/threads at least) unavoidable. But using 2 is far more than just twice as costly, because the two can interact badly to become very costly indeed. Understanding how this comes about is quite difficult to follow; very hard to explain; and impossible to demonstrate. Please either take my word for it; or do your own research :)

      The upshot is that I've never used Thread::Semaphore; I had to install it to run your code, and I will be uninstalling it immediately. Not because it is badly implemented -- from a cursory glance it looks just fine -- but because there is nothing it can do for me that I cannot do better with the underlying locking mechanism applied directly to the shared resource to be controlled.

      But if I use it, then I will have to use two separate locking mechanisms to control that single resource -- the use of threads::shared::lock() is unavoidable. And applying two locking mechanisms to a single resource is always a recipe for slow code at best; and dead/livelocks and priority inversions all too frequently.

      I highly recommend that you drop your use of Thread::Semaphore and adopt my threads::Q implementation. The latter is now very well tested and proven and simplifies your use-case beyond measure.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Thanks for the eval { ... } suggestion, I'll experiment.

        The reason why I use my own thread ids is I want to be 100% sure they lie in 1..number_of_threads range (0 is used for scrolling messages).

        What is my concern?

        Once again, I did not give the full picture.

        As soon as I start multi-thread operation, the screen is split into an upper scrolling region and a lower non-scrolling status area where each line "belongs" to a thread.(*)

        Thus, I have the traditional scrolling information area for routine messages and a steady area where "accidents" can be shown for a longer period of time. Of course, a reminder summary is printed when everything is done.

        Now, since my thread id is used to index the private line, this id MUST stay in the desired range.

        Your Q implementation is exactly what I need. Since i'm not the only one to need this feature, I wonder why this is not included in the queue package. All your arguments make great sense. My concern is speed (I already mentioned that the whole process improved from 2:30 hours to 57 minutes) and I'd like to squeeze all inefficiencies out of my code, but there are very clumsy implementations in other sections (notably when it comes to parsing text).

        (*) This heavily uses ANSI escape codes. I made the assumption current terminal emulations implement the most usual sequences (according to Wikipedia) and avoided "cute" sequences tied to a specific implementation. I allowed back/foreground colours and cursor addressing.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://1056256]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others chilling in the Monastery: (15)
As of 2014-07-29 16:33 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    My favorite superfluous repetitious redundant duplicative phrase is:









    Results (223 votes), past polls