Beefy Boxes and Bandwidth Generously Provided by pair Networks
We don't bite newbies here... much
 
PerlMonks  

Re: How can I force thread switching?

by BrowserUk (Pope)
on Sep 28, 2013 at 17:00 UTC ( #1056152=note: print w/ replies, xml ) Need Help??


in reply to How can I force thread switching?

If you post the full code -- or better, a cut-down but runnable version of it -- I'll take a look at this, but I'm not going to waste time trying to re-create your code from your description.

Update: I suspect the problem is that you are trying to do the scheduling yourself, instead of allowing the well-designed and highly evolved scheduler in your operating system do what it is designed to do.

Using a semaphore in conjunction with a queue does not make a lot of sense on the surface of the scant description you've provided; but I'm reserving judgment until I've seen something I can run.


With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.


Comment on Re: How can I force thread switching?
Re^2: How can I force thread switching?
by ajl52 (Novice) on Sep 29, 2013 at 09:04 UTC

    Hi,

    Trying to streamline my code to make it a standalone sample, I changed a bit the "alarm" criteria from $queuelen <= $throttle->pending() to a non-blocking semaphore decrement $throttle->down_nb().

    It now works as expected without need for sleep(), nor yield().

    However, my demo code shows a bug which is (apparently) not present in production code: one of the threads is silently terminated (without intentional error injection) and I can't find why.

    Code example:

    #!/usr/bin/perl # -*- tab-width: 4 -*- use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for (1 .. $multi) { push ( @thread , threads->create (\&processrefsthread, $_) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub syncIdle { # Check if any thread errored out my $abort = 0; for my $i (0..$#thread) { if ( !$thread[$i]->is_running() || $thread[$i]->is_joinable() ) { lock($screenaccess); print (STDERR 'ERROR: thread #' , 1+$i , ' encountered a problem while processing request +' , "\n" , $log[1+$i] , "\n" , 'Check the cause and eventually report a bug.' , "\n" ); $abort ||= 1; } } if ($abort) { endThreadedOperation(); print (STDERR "Flushing and aborting now ...\n"); print (STDERR 'The error message may have scrolled out due to +asynchronous operation. Check.', "\n"); exit(1); } while (0 < $dispatcher->pending()) { threads->yield(); # sleep(1); # Retry later } } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub logStart { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logsize = length($text); $logline = $attr . $text; $log[$where] = $logline; # For debugging lock($screenaccess); print (STDERR '++ ' , $text , "\n" ); } sub logFinal { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logline .= $attr . $text; lock($screenaccess); print (STDERR '-- ' , $logline , "\n" ); $logline = undef; $log[$where] = undef; } sub processrefsthread { my $logid = shift; logFinal($logid, "Thread #$logid created\n", ''); while (my $job = $dispatcher->dequeue()) { $throttle->up(); # Job removed from queue my $d = int(rand(5)); logStart ( $logid , "$logid processing $job (duration $d)" , '' ); # sleep($d); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } ####### # Inoculate a bug # # # if (1 == $logid) # { # logFinal($logid, "\nkilling thread #$logid", ''); # my $z = 0; # $z = 1/$z; # # threads->exit(); # } ####### logFinal($logid, ' ... Done', ''); } } sub queueProcessRequest { my ($job) = @_; # If the queue fills up, it may be caused by threads killed # by an error. In this case, we'll be blocked forever below. # Then let's have a look on the threads. # if ($queuelen <= $dispatcher->pending()) { if (!$throttle->down_nb()) { { lock($screenaccess); print STDERR "***** Queue full\n"; } # threads->yield(); # sleep(1); # Give a chance foreach my $t (@thread) { if ( !$t->is_running() || $t->is_joinable() ) { syncIdle(); # Diagnose and abort } } $throttle->down(); } $dispatcher->enqueue($job); return undef } initThreadedOperation(4); for my $i (0..50) { queueProcessRequest ($i); } logFinal(0, "Queueing completed!\n", ''); syncIdle(); endThreadedOperation();

    When run as is, it reports that a (randomly selected) thread is already terminated, before trying to process even its first job.

    If I wrote something wrong, it might happen in the production code

    Note: to cause threads to error out, uncomment the lines in "Inoculate a bug" block

      one of the threads is silently terminated (without intentional error injection) and I can't find why.

      Your first thread to run will always terminate immediately because you are pushing a job id of zero:

      for my $i ( 0 .. 50 ) { queueProcessRequest( $i ); }

      Which when it is received:

      while (my $job = $dispatcher->dequeue()) {

      Test false and terminates the loop.

      Aside from that, your code is very confused. I have no idea what syncIdle is meant to be doing, and removing it, along with various other chunks of code that seem to serve no good purpose, I got to this which runs perfectly:

      That said, limiting your queue to 4 elements to feed 4 threads means that your threads are running in lock-step, which pretty much defeats the purpose of using a queue.

      And using Thread::Semaphore as the mechanism for limiting the size of the queue is like hiring the Pinkerton's to manage the queue for the bathroom in the mornings. Not just overkill, but by forcing lock-step (synchronising your threads), throws away much of the gain from asynchronous processing.

      Here's how I would write the same program:

      #!/usr/bin/perl -lw use strict; use threads; use threads::shared; use threads::Q; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } our $T //= 4; my $Q = threads::Q->new( $T * 4 ); ## Q self-limits to 4*$T elements my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T; $Q->nq( $_ ) for 1 .. 50; $Q->nq( (undef) x $T ); $_->join for @threads;

      You can get threads::Q (my own, efficient, self-limiting queue implementation) from here.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        Your first thread to run will always terminate immediately because you are pushing a job id of zero:

        Shame on me! Now you make the remark, it is obvious.

        Aside from that, your code is very confused.

        The actual code was simplified to caricature so that problems and bugs could occur rapidly.

        For instance, the queue length was made equal to the number of workers to reach saturation in a jiffie (have you noticed 1 * $multi which is totally silly).

        In the same line, job to be done was replaced by a number (forgetting that 0 = false, gasp!). In production code, it is a reference to a list containing various objects: strings and hashes. Since the hashes are rather big, I preferred to add a semaphore to slow down the dispatcher (it runs like a lightning and could stuff 50k requests in an instant) because I do not want to flood memory uselessly (to be memory-friendly). Of course, once the full process has warmed up, the dispatcher queues request only one at a time as the workers remove one job from the queue. But the queue is long enough (2-3 x workers) to avoid starving.

        The confused code may be a reference to my fiddling with is_running and is_joinable. This a (feeble?) attempt to protect against programming errors in the workers. I noticed that if a worker crashes I get error messages about unfinished threads when the main process terminates. The idea is to note something weird happened and to be able to print a message catching user attention because the original (Perl) error message may have scrolled away for a long time.

        When this condition is detected, syncIdle is launched to wait for active thread normal termination. syncIdle is also called by the dispatcher when it has finished queueing all requests to wait for request complete processing before doing something else.

        The situation is even more serious if all workers crashed. In this case, the queue will never be emptied. I need a way to regain control in the main thread (or somewhere else) otherwise I'm deadlocked.

        Since this is my first multi-thread Perl script, there may be a much better way to do it. Have you any suggestion?

        Concerning the logxxx, these are thread-safe (I hope) subs to write to the common screen. Every worker has a dedicated line on screen to log its activity. What was not shown is the role of the third argument $attr. It contains ANSI escape code to highlight the text in the second argument. It is not included in the text, so that the number of columns used on screen may be easily computed from the length of text (which is not supposed to contain zero-width Unicode characters or other multi-byte characters). This allows logAdd and logFinal to add their text at the tail of what is already on screen.

        Anyway, a big thank for the help.

        Your latest code shows you passed a reference to the queue as a thread argument. What is the advantage/difference compared to a :shared variable? What is the efficiency impact?

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://1056152]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others wandering the Monastery: (6)
As of 2014-11-21 02:42 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    My preferred Perl binaries come from:














    Results (104 votes), past polls