Hi,
Trying to streamline my code to make it a standalone sample, I changed a bit the "alarm" criteria from $queuelen <= $throttle->pending() to a non-blocking semaphore decrement $throttle->down_nb().
It now works as expected without need for sleep(), nor yield().
However, my demo code shows a bug which is (apparently) not present in production code: one of the threads is silently terminated (without intentional error injection) and I can't find why.
Code example:
#!/usr/bin/perl
# -*- tab-width: 4 -*-
use strict;
use threads;
use threads::shared;
use Thread::Queue;
use Thread::Semaphore;
use Time::HiRes ('usleep');
my $screenaccess :shared; # Lock for printing to screen
my $multi :shared; # Number simultaneous threads
my $logline; # Log line content
my $logsize; # Log line size (not counting attributes)
my @log :shared; # For debugging, incase a thread errors out
my $dispatcher :shared; # Job queue
my $queuelen; # Max queue length
my $throttle :shared; # Queue semaphore
my @thread;
sub initThreadedOperation {
$multi = shift;
$queuelen = 1 * $multi;
$dispatcher = Thread::Queue->new();
$throttle = Thread::Semaphore->new($queuelen);
for (1 .. $multi) {
push ( @thread
, threads->create (\&processrefsthread, $_)
);
}
print STDERR scalar(@thread), ' threads created', "\n";
}
sub syncIdle {
# Check if any thread errored out
my $abort = 0;
for my $i (0..$#thread) {
if ( !$thread[$i]->is_running()
|| $thread[$i]->is_joinable()
) {
lock($screenaccess);
print (STDERR
'ERROR: thread #'
, 1+$i
, ' encountered a problem while processing request
+'
, "\n"
, $log[1+$i]
, "\n"
, 'Check the cause and eventually report a bug.'
, "\n"
);
$abort ||= 1;
}
}
if ($abort) {
endThreadedOperation();
print (STDERR "Flushing and aborting now ...\n");
print (STDERR 'The error message may have scrolled out due to
+asynchronous operation. Check.', "\n");
exit(1);
}
while (0 < $dispatcher->pending()) {
threads->yield();
# sleep(1); # Retry later
}
}
sub endThreadedOperation {
$dispatcher->enqueue((undef) x scalar(@thread));
foreach (@thread) {
$_->join();
}
}
sub logStart {
my ($where, $text, $attr) = @_;
if ( $where < 0
|| $where > $multi
) {
return;
}
$logsize = length($text);
$logline = $attr . $text;
$log[$where] = $logline; # For debugging
lock($screenaccess);
print (STDERR
'++ '
, $text
, "\n"
);
}
sub logFinal {
my ($where, $text, $attr) = @_;
if ( $where < 0
|| $where > $multi
) {
return;
}
$logline .= $attr . $text;
lock($screenaccess);
print (STDERR
'-- '
, $logline
, "\n"
);
$logline = undef;
$log[$where] = undef;
}
sub processrefsthread {
my $logid = shift;
logFinal($logid, "Thread #$logid created\n", '');
while (my $job = $dispatcher->dequeue()) {
$throttle->up(); # Job removed from queue
my $d = int(rand(5));
logStart ( $logid
, "$logid processing $job (duration $d)"
, ''
);
# sleep($d);
for my $i (0 .. $d * 500000) {
my $j = $i * $d;
$j = int(rand($j));
}
#######
# Inoculate a bug
#
# # if (1 == $logid)
# {
# logFinal($logid, "\nkilling thread #$logid", '');
# my $z = 0;
# $z = 1/$z;
# # threads->exit();
# }
#######
logFinal($logid, ' ... Done', '');
}
}
sub queueProcessRequest {
my ($job) = @_;
# If the queue fills up, it may be caused by threads killed
# by an error. In this case, we'll be blocked forever below.
# Then let's have a look on the threads.
# if ($queuelen <= $dispatcher->pending()) {
if (!$throttle->down_nb()) {
{ lock($screenaccess);
print STDERR "***** Queue full\n";
}
# threads->yield();
# sleep(1); # Give a chance
foreach my $t (@thread) {
if ( !$t->is_running()
|| $t->is_joinable()
) {
syncIdle(); # Diagnose and abort
}
}
$throttle->down();
}
$dispatcher->enqueue($job);
return undef
}
initThreadedOperation(4);
for my $i (0..50) {
queueProcessRequest ($i);
}
logFinal(0, "Queueing completed!\n", '');
syncIdle();
endThreadedOperation();
When run as is, it reports that a (randomly selected) thread is already terminated, before trying to process even its first job.
If I wrote something wrong, it might happen in the production code
Note: to cause threads to error out, uncomment the lines in "Inoculate a bug" block
-
Are you posting in the right place? Check out Where do I post X? to know for sure.
-
Posts may use any of the Perl Monks Approved HTML tags. Currently these include the following:
<code> <a> <b> <big>
<blockquote> <br /> <dd>
<dl> <dt> <em> <font>
<h1> <h2> <h3> <h4>
<h5> <h6> <hr /> <i>
<li> <nbsp> <ol> <p>
<small> <strike> <strong>
<sub> <sup> <table>
<td> <th> <tr> <tt>
<u> <ul>
-
Snippets of code should be wrapped in
<code> tags not
<pre> tags. In fact, <pre>
tags should generally be avoided. If they must
be used, extreme care should be
taken to ensure that their contents do not
have long lines (<70 chars), in order to prevent
horizontal scrolling (and possible janitor
intervention).
-
Want more info? How to link
or How to display code and escape characters
are good places to start.