http://www.perlmonks.org?node_id=971468

Anonymous Monk has asked for the wisdom of the Perl Monks concerning the following question:

Hi All, I'm very new to PERL threading.. and got a bit struck in implementing a ThreadPool. I saw the Thread::Pool module, but since that has the implementation for older thread model, I decided to try and implement one on my own(just an experiment). The idea is the same - put all the work in a queue and a fixed number of threads should work on it, till the queue is emptied.
my $pool = Custom::Utility::ThreadPool->new(maxThreads => 5); $pool->enqueue(&func, $arg1, $arg2...); $pool->start; $pool->blockTillCompletion;
Here is my module :
package Custom::Utility::ThreadPool; use strict; use threads; use threads::shared; sub new { my $class = shift; my %params = @_; my $pool = { maxThreads => $params{maxThreads}, workQueue => undef, jobs => undef, }; bless($pool, $class); return $pool; } sub work { my $pool = shift; while(my @args = shift @{$pool->{workQueue}}) { print "ID=" .threads->self()->tid() ." : WorkQueue = " .@{$pool->{wor +kQueue}} ."\n"; #problem here; every thread seems to have its own co +py of @{$pool->{workQueue}} my $code = shift @args; &$code(@args); } } sub start { my $pool = shift; # ensure that start is executed only once on a ThreadPool instance return if $pool->{jobs}; # make sure there undefs are enqueued to the workQueue so that child + threads can # complete the process rather than being blocked on Thread::Queue's +dequeue push @{$pool->{workQueue}}, ((undef) x $pool->{maxThreads}); @{$pool->{jobs}} = map { threads->create(\&work, $pool) } 1..$pool-> +{maxThreads}; } sub enqueue { my ($pool, $code, @args) = @_; # default the namespace to the caller's package if only a name is # specificed instead of a code or its reference $code = caller() ."::$code" if !ref($code) && $code !~ /::/; # push it to the workQueue list push @{$pool->{workQueue}}, [$code, @args]; } sub blockTillCompletion { my $pool = shift; $_->join() for @{$pool->{jobs}}; } 1;
The problem I'm facing is - in the work subroutine, every thread is getting its own copy of $pool it seems. Say if there are 10 items in the workQueue, all threads executing work subroutine seems to get a copy of the workQueue and each proceeds to execute all 10 of them. The expected is the 10 work items must be shared and completed. I tried to share @{$pool->workQueue} but wasn't able to. PERL gave me compile errors saying 'invalid value for shared scalar'. I'm using PERL5.8.8. Can anybody help me in overcoming this, please.

Replies are listed 'Best First'.
Re: Implementing Custom ThreadPool
by zwon (Abbot) on May 20, 2012 at 10:36 UTC
    Thread::Pool module, but since that has the implementation for older thread model

    What made you think so? It seems passes tests on modern perls without any problems.

    every thread is getting its own copy of $pool

    That is because $pool is not shared, you use threads::shared but aren't really using it. Have also a look onto Thread::Queue.

Re: Implementing Custom ThreadPool
by BrowserUk (Patriarch) on May 20, 2012 at 14:24 UTC

    I find it very puzzling that you reference Thread::Queue in your comments:

    # make sure there undefs are enqueued to the workQueue so that child + threads can # complete the process rather than being blocked on Thread::Queue's +dequeue

    but seem to have forgotten to include it in your code. Especially as it would exactly address the apparent problem with your "experiment".

    Consistently reinforcing your "I'm very new to PERL threading.." by upcasing Perl is a really cheesy touch.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

    The start of some sanity?

Re: Implementing Custom ThreadPool
by Khen1950fx (Canon) on May 20, 2012 at 13:09 UTC
    Your script using Thread::Queue:
    package myQueue; use strictures 1; use autodie qw/:all/; use Devel::SimpleTrace; use threads qw/yield/; use Thread::Queue; sub new { use strict qw/refs/; use warnings; my $class = shift(); my (@params) = shift @_; return bless(@params, $class); } sub work { use strict qw/refs/; use warnings; my($pool) = shift @_; while ( my (@args) = shift @{ $$pool{'workQueue'}; } ) { print "ID=" . threads->self()->tid() . " : WorkQueue = " . @{ $$pool{'workQueue'}; }; my($code) = shift @args; &$code(@args); } } sub start { use strict qw/refs/; use warnings; my($pool) = shift(); return if $$pool{'jobs'}; push @{ $$pool{'workQueue'}; }, (undef) x $$pool{'maxThreads'}; @{ $$pool{'jobs'}; } = map( { threads->create( \&work, $pool ); } 1 .. $$pool{'maxThreads'} ); } sub enqueue { use strict qw/refs/; use warnings; my ( $pool, $code, @args ) = @_; $code = caller() . "::$code" unless ref $code or $code =~ /::/; push @{ $$pool{'workQueue'}; }, [ $code, @args ]; } my $maxThreads = 5; my @args = @_; my $pool = Thread::Queue->new(\$maxThreads); $pool->enqueue($args[0], $args[1]); my $left = $pool->pending(); print "There are $left items waiting in the queue\n";