Beefy Boxes and Bandwidth Generously Provided by pair Networks
Clear questions and runnable code
get the best and fastest answer
 
PerlMonks  

Comment on

( #3333=superdoc: print w/ replies, xml ) Need Help??

Hi,

Trying to streamline my code to make it a standalone sample, I changed a bit the "alarm" criteria from $queuelen <= $throttle->pending() to a non-blocking semaphore decrement $throttle->down_nb().

It now works as expected without need for sleep(), nor yield().

However, my demo code shows a bug which is (apparently) not present in production code: one of the threads is silently terminated (without intentional error injection) and I can't find why.

Code example:

#!/usr/bin/perl # -*- tab-width: 4 -*- use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for (1 .. $multi) { push ( @thread , threads->create (\&processrefsthread, $_) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub syncIdle { # Check if any thread errored out my $abort = 0; for my $i (0..$#thread) { if ( !$thread[$i]->is_running() || $thread[$i]->is_joinable() ) { lock($screenaccess); print (STDERR 'ERROR: thread #' , 1+$i , ' encountered a problem while processing request +' , "\n" , $log[1+$i] , "\n" , 'Check the cause and eventually report a bug.' , "\n" ); $abort ||= 1; } } if ($abort) { endThreadedOperation(); print (STDERR "Flushing and aborting now ...\n"); print (STDERR 'The error message may have scrolled out due to +asynchronous operation. Check.', "\n"); exit(1); } while (0 < $dispatcher->pending()) { threads->yield(); # sleep(1); # Retry later } } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub logStart { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logsize = length($text); $logline = $attr . $text; $log[$where] = $logline; # For debugging lock($screenaccess); print (STDERR '++ ' , $text , "\n" ); } sub logFinal { my ($where, $text, $attr) = @_; if ( $where < 0 || $where > $multi ) { return; } $logline .= $attr . $text; lock($screenaccess); print (STDERR '-- ' , $logline , "\n" ); $logline = undef; $log[$where] = undef; } sub processrefsthread { my $logid = shift; logFinal($logid, "Thread #$logid created\n", ''); while (my $job = $dispatcher->dequeue()) { $throttle->up(); # Job removed from queue my $d = int(rand(5)); logStart ( $logid , "$logid processing $job (duration $d)" , '' ); # sleep($d); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } ####### # Inoculate a bug # # # if (1 == $logid) # { # logFinal($logid, "\nkilling thread #$logid", ''); # my $z = 0; # $z = 1/$z; # # threads->exit(); # } ####### logFinal($logid, ' ... Done', ''); } } sub queueProcessRequest { my ($job) = @_; # If the queue fills up, it may be caused by threads killed # by an error. In this case, we'll be blocked forever below. # Then let's have a look on the threads. # if ($queuelen <= $dispatcher->pending()) { if (!$throttle->down_nb()) { { lock($screenaccess); print STDERR "***** Queue full\n"; } # threads->yield(); # sleep(1); # Give a chance foreach my $t (@thread) { if ( !$t->is_running() || $t->is_joinable() ) { syncIdle(); # Diagnose and abort } } $throttle->down(); } $dispatcher->enqueue($job); return undef } initThreadedOperation(4); for my $i (0..50) { queueProcessRequest ($i); } logFinal(0, "Queueing completed!\n", ''); syncIdle(); endThreadedOperation();

When run as is, it reports that a (randomly selected) thread is already terminated, before trying to process even its first job.

If I wrote something wrong, it might happen in the production code

Note: to cause threads to error out, uncomment the lines in "Inoculate a bug" block


In reply to Re^2: How can I force thread switching? by ajl52
in thread How can I force thread switching? by ajl52

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post; it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • Outside of code tags, you may need to use entities for some characters:
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.
  • Log In?
    Username:
    Password:

    What's my password?
    Create A New User
    Chatterbox?
    and the web crawler heard nothing...

    How do I use this? | Other CB clients
    Other Users?
    Others chanting in the Monastery: (9)
    As of 2014-07-12 07:15 GMT
    Sections?
    Information?
    Find Nodes?
    Leftovers?
      Voting Booth?

      When choosing user names for websites, I prefer to use:








      Results (238 votes), past polls