Beefy Boxes and Bandwidth Generously Provided by pair Networks
Don't ask to ask, just ask
 
PerlMonks  

How can I force thread switching?

by ajl52 (Novice)
on Sep 28, 2013 at 15:06 UTC ( [id://1056140]=perlquestion: print w/replies, xml ) Need Help??

ajl52 has asked for the wisdom of the Perl Monks concerning the following question:

Dear Perl monks,

I have sped up a time-consuming task through concurrent execution of several instances of the same code ("crew" thread model). It is quite effective since time to complete the task decreases from 2 hours 33 minutes to 56 minutes.

The question is: How can protect against ALL threads erroring out?

Basically my code queues the jobs to be done with throttling control through a semaphore to prevent memory flooding with pending jobs:

sub queueProcessRequest { my ($job) = @_; $throttle->down(); $dispatcher->enqueue($job); return undef }

Worker thread increments the semaphore as soon as it removes a job from the queue.

As long as there remains a live worker thread, the queue can be emptied and the main thread is not blocked on the semaphore. When it has finished queuing the job, it can check thread states and wait for termination:

sub syncIdle { # Check if any thread errored out my $abort = 0; for my $i (0..$#thread) { if ( !$thread[$i]->is_running() || $thread[$i]->is_joinable() ) { lock($screenaccess); print (STDERR 'ERROR: thread #' , 1+$i , ' encountered a problem while processing file' , "\n" , $log[1+$i] , "\n" , 'Check the cause and eventually report a bug.' , "\n" ); $abort ||= 1; } } if ($abort) { endThreadedOperation(); print (STDERR "${VTred}Flushing and aborting now ...${VTnorm}\ +n"); print (STDERR 'The error message may have scrolled out due to +asynchronous operation. Check.', "\n"); exit(1); } while ($busy || 0 < $dispatcher->pending()) { threads->yield(); # sleep(1); # Retry later } }

However, if ALL worker threads errored out (if a bug is present, it is likely to happen in all threads since they share the same code base), the job queue eventually fills up, the main thread is blocked on the semaphore and never gets the opportunity to call syncIdle()

I tried putting some threads->yield() in adequate locations but on my Fedora Linux yield() is just a no-op (as the manual warns).

I then modified the queueing sub as follows:

sub queueProcessRequest { my ($job) = @_; # If the queue fills up, it may be caused by threads killed # by an error. In this case, we'll be blocked forever below. # Then let's have a look on the threads. if ($queuelen <= $dispatcher->pending()) { # threads->yield(); sleep(1); # Give a chance foreach my $t (@thread) { if ( !$t->is_running() || $t->is_joinable() ) { syncIdle(); # Diagnose and abort } } } $throttle->down(); $dispatcher->enqueue($job); return undef }

This works as intended, BUT ...

It looks like the queue gets filled first. I then see a pause (sleep(1) above) and worker threads get scheduled to do their job. The cycle restarts.

Note: all debugging code not shown in code snippet.

My analysis is thread switching occurs only at sleep() time.

I can't afford to leave such sleep(1) calls in the code since it would mean about 5000 seconds (roughly 1 hour and a half compared to 56 minutes) wasted to wait for scheduling.

I replaced sleep() with usleep() but the delay must also be long enough to give switching a chance (on the order of 15 ms). Unfortunately, the required minimum delay seems to depend on the number of worker threads, the global machine load, ... and is affected by some sort of jitter.

What I need is a way to force thread switching without causing delay so that my job queue can be emptied by the worker threads.

How can I do it?

My design may also be wrong. Is there an alternate suggestion?

Replies are listed 'Best First'.
Re: How can I force thread switching?
by BrowserUk (Patriarch) on Sep 28, 2013 at 17:00 UTC

    If you post the full code -- or better, a cut-down but runnable version of it -- I'll take a look at this, but I'm not going to waste time trying to re-create your code from your description.

    Update: I suspect the problem is that you are trying to do the scheduling yourself, instead of allowing the well-designed and highly evolved scheduler in your operating system do what it is designed to do.

    Using a semaphore in conjunction with a queue does not make a lot of sense on the surface of the scant description you've provided; but I'm reserving judgment until I've seen something I can run.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Hi,

      Trying to streamline my code to make it a standalone sample, I changed a bit the "alarm" criteria from $queuelen <= $throttle->pending() to a non-blocking semaphore decrement $throttle->down_nb().

      It now works as expected without need for sleep(), nor yield().

      However, my demo code shows a bug which is (apparently) not present in production code: one of the threads is silently terminated (without intentional error injection) and I can't find why.

      Code example:

      #!/usr/bin/perl # -*- tab-width: 4 -*- use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for (1 .. $multi) { push ( @thread , threads->create (\&processrefsthread, $_) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub syncIdle { # Check if any thread errored out my $abort = 0; for my $i (0..$#thread) { if ( !$thread[$i]->is_running() || $thread[$i]->is_joinable() ) { lock($screenaccess); print (STDERR 'ERROR: thread #' , 1+$i , ' encountered a problem while processing request +' , "\n" , $log[1+$i] , "\n" , 'Check the cause and eventually report a bug.' , "\n" ); $abort ||= 1; } } if ($abort) { endThreadedOperation(); print (STDERR "Flushing and aborting now ...\n"); print (STDERR 'The error message may have scrolled out due to +asynchronous operation. Check.', "\n"); exit(1); } while (0 < $dispatcher->pending()) { threads->yield(); # sleep(1); # Retry later } } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub logStart { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logsize = length($text); $logline = $attr . $text; $log[$where] = $logline; # For debugging lock($screenaccess); print (STDERR '++ ' , $text , "\n" ); } sub logFinal { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logline .= $attr . $text; lock($screenaccess); print (STDERR '-- ' , $logline , "\n" ); $logline = undef; $log[$where] = undef; } sub processrefsthread { my $logid = shift; logFinal($logid, "Thread #$logid created\n", ''); while (my $job = $dispatcher->dequeue()) { $throttle->up(); # Job removed from queue my $d = int(rand(5)); logStart ( $logid , "$logid processing $job (duration $d)" , '' ); # sleep($d); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } ####### # Inoculate a bug # # # if (1 == $logid) # { # logFinal($logid, "\nkilling thread #$logid", ''); # my $z = 0; # $z = 1/$z; # # threads->exit(); # } ####### logFinal($logid, ' ... Done', ''); } } sub queueProcessRequest { my ($job) = @_; # If the queue fills up, it may be caused by threads killed # by an error. In this case, we'll be blocked forever below. # Then let's have a look on the threads. # if ($queuelen <= $dispatcher->pending()) { if (!$throttle->down_nb()) { { lock($screenaccess); print STDERR "***** Queue full\n"; } # threads->yield(); # sleep(1); # Give a chance foreach my $t (@thread) { if ( !$t->is_running() || $t->is_joinable() ) { syncIdle(); # Diagnose and abort } } $throttle->down(); } $dispatcher->enqueue($job); return undef } initThreadedOperation(4); for my $i (0..50) { queueProcessRequest ($i); } logFinal(0, "Queueing completed!\n", ''); syncIdle(); endThreadedOperation();

      When run as is, it reports that a (randomly selected) thread is already terminated, before trying to process even its first job.

      If I wrote something wrong, it might happen in the production code

      Note: to cause threads to error out, uncomment the lines in "Inoculate a bug" block

        one of the threads is silently terminated (without intentional error injection) and I can't find why.

        Your first thread to run will always terminate immediately because you are pushing a job id of zero:

        for my $i ( 0 .. 50 ) { queueProcessRequest( $i ); }

        Which when it is received:

        while (my $job = $dispatcher->dequeue()) {

        Test false and terminates the loop.

        Aside from that, your code is very confused. I have no idea what syncIdle is meant to be doing, and removing it, along with various other chunks of code that seem to serve no good purpose, I got to this which runs perfectly:

        #!/usr/bin/perl -lw use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for( 1 .. $multi ) { push( @thread, threads->create( \&processrefsthread, $_ ) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub processrefsthread { tprint 'Starting'; while (my $job = $dispatcher->dequeue()) { tprint "processing job $job"; $throttle->up(); # Job removed from queue my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } sub queueProcessRequest { my ($job) = @_; $dispatcher->enqueue($job); $throttle->down(); return undef } initThreadedOperation( 4 ); for my $i ( 1 .. 50 ) { tprint "Qing job $i"; queueProcessRequest( $i ); } endThreadedOperation();

        That said, limiting your queue to 4 elements to feed 4 threads means that your threads are running in lock-step, which pretty much defeats the purpose of using a queue.

        And using Thread::Semaphore as the mechanism for limiting the size of the queue is like hiring the Pinkerton's to manage the queue for the bathroom in the mornings. Not just overkill, but by forcing lock-step (synchronising your threads), throws away much of the gain from asynchronous processing.

        Here's how I would write the same program:

        #!/usr/bin/perl -lw use strict; use threads; use threads::shared; use threads::Q; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub processrefsthread { my $Q = shift; tprint 'Starting'; while( my $job = $Q->dq() ) { tprint "processing job $job"; my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } our $T //= 4; my $Q = threads::Q->new( $T * 4 ); ## Q self-limits to 4*$T elements my @threads = map threads->new( \&processrefsthread, $Q ), 1 .. $T; $Q->nq( $_ ) for 1 .. 50; $Q->nq( (undef) x $T ); $_->join for @threads;

        You can get threads::Q (my own, efficient, self-limiting queue implementation) from here.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: How can I force thread switching?
by Grimy (Pilgrim) on Sep 28, 2013 at 15:29 UTC
    In Java, calling Thread.yield() (or, alternatively, Thread.sleep(0)) does what you want. I’m not sure how to do the equivalent in Perl, but if sleeping for one micro-second is more acceptable, you may want to look at Time::HiRes.

      I have already explored Time::HiRes since usleep() is defined in this package.

      I also attempted to wait for 0 unit of time (second or microsecond) to no avail. The null delay is probably detected somewhere and optimised out.

      As stated in the manual, yield() is a no-op on most systems.

        I also attempted to wait for 0 unit of time (second or microsecond) to no avail. The null delay is probably detected somewhere and optimised out.

        BrowserUk says yield is a mistake (like sleep 0), so sleep more, like 1/2 (or less) of the minimum time period that you believe it will take, like 33 milliseconds -- seems to hold true on win32, good rule of thumb :)

Re: How can I force thread switching?
by Anonymous Monk on Sep 29, 2013 at 12:39 UTC
    Put the jobs into a queue with no capacity-issues, such as a shared database table. Then, redesign the system so that the threads live forever, retrieving work and doing it until told to stop. The job-scheduling and job-output activities should also be handled by threads. The parent's only duties are to watch the kids.

      Ignore this 'advice'. It's garbage. (You're foolin' noone.)

      Update: I was asked to explain why it is garbage.

      It is garbage because sundia anonymous monk suggests:

      Then, redesign the system so that the threads live forever, retrieving work and doing it until told to stop.

      Which is stupid, because that is exactly what the OPs code does.

      This guy cannot even read code. It should be possible to take out injunctions against morons like him.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.
Re: How can I force thread switching?
by sundialsvc4 (Abbot) on Sep 29, 2013 at 20:36 UTC

    What?   Did I hear my name used in vain, on such a pleasant Sunday morn?

    He has a point:   use a thread to enqueue the jobs for the other threads.   This thread is the “nanny,” whose only purpose is to keep the children fed.   (Or in the good ol’ days, it is the Job Entry Subsystem of what was then called MVS now z/OS.)   It runs until it runs out of food, then quits.   If that thread winds up waiting-forever, so what.

    The bottom line is that the parent does not have to be directly responsible for job-queueing.   The parent does not have to wait for it, and so, job-queueing will occur properly no matter what the user (interface ...) is or is not doing.   It is no longer “the parent’s concern.”   The parent’s only concern, now, is to run the user-interface until all of its children die (or until “the last man standing” is the scheduler).   Although the responsibilities of one particular child are somewhat greater than that of all the others, the parent has no need to “be concerned.”

    It’s very hard, in any language, to have a (especially, to have a parent) thread with two concerns, and also to be both user-facing and peer-facing at the same time.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://1056140]
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others wandering the Monastery: (6)
As of 2024-04-19 10:58 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found