http://www.perlmonks.org?node_id=1056140

ajl52 has asked for the wisdom of the Perl Monks concerning the following question:

Dear Perl monks,

I have sped up a time-consuming task through concurrent execution of several instances of the same code ("crew" thread model). It is quite effective since time to complete the task decreases from 2 hours 33 minutes to 56 minutes.

The question is: How can protect against ALL threads erroring out?

Basically my code queues the jobs to be done with throttling control through a semaphore to prevent memory flooding with pending jobs:

sub queueProcessRequest { my ($job) = @_; $throttle->down(); $dispatcher->enqueue($job); return undef }

Worker thread increments the semaphore as soon as it removes a job from the queue.

As long as there remains a live worker thread, the queue can be emptied and the main thread is not blocked on the semaphore. When it has finished queuing the job, it can check thread states and wait for termination:

sub syncIdle { # Check if any thread errored out my $abort = 0; for my $i (0..$#thread) { if ( !$thread[$i]->is_running() || $thread[$i]->is_joinable() ) { lock($screenaccess); print (STDERR 'ERROR: thread #' , 1+$i , ' encountered a problem while processing file' , "\n" , $log[1+$i] , "\n" , 'Check the cause and eventually report a bug.' , "\n" ); $abort ||= 1; } } if ($abort) { endThreadedOperation(); print (STDERR "${VTred}Flushing and aborting now ...${VTnorm}\ +n"); print (STDERR 'The error message may have scrolled out due to +asynchronous operation. Check.', "\n"); exit(1); } while ($busy || 0 < $dispatcher->pending()) { threads->yield(); # sleep(1); # Retry later } }

However, if ALL worker threads errored out (if a bug is present, it is likely to happen in all threads since they share the same code base), the job queue eventually fills up, the main thread is blocked on the semaphore and never gets the opportunity to call syncIdle()

I tried putting some threads->yield() in adequate locations but on my Fedora Linux yield() is just a no-op (as the manual warns).

I then modified the queueing sub as follows:

sub queueProcessRequest { my ($job) = @_; # If the queue fills up, it may be caused by threads killed # by an error. In this case, we'll be blocked forever below. # Then let's have a look on the threads. if ($queuelen <= $dispatcher->pending()) { # threads->yield(); sleep(1); # Give a chance foreach my $t (@thread) { if ( !$t->is_running() || $t->is_joinable() ) { syncIdle(); # Diagnose and abort } } } $throttle->down(); $dispatcher->enqueue($job); return undef }

This works as intended, BUT ...

It looks like the queue gets filled first. I then see a pause (sleep(1) above) and worker threads get scheduled to do their job. The cycle restarts.

Note: all debugging code not shown in code snippet.

My analysis is thread switching occurs only at sleep() time.

I can't afford to leave such sleep(1) calls in the code since it would mean about 5000 seconds (roughly 1 hour and a half compared to 56 minutes) wasted to wait for scheduling.

I replaced sleep() with usleep() but the delay must also be long enough to give switching a chance (on the order of 15 ms). Unfortunately, the required minimum delay seems to depend on the number of worker threads, the global machine load, ... and is affected by some sort of jitter.

What I need is a way to force thread switching without causing delay so that my job queue can be emptied by the worker threads.

How can I do it?

My design may also be wrong. Is there an alternate suggestion?