I submitted a bugreport.
I did some benchmarking,
and calculated the time of running a function 300 times.
The results are, again, most suprising to me.
With 10 worker threads in a pool I get this:
micha@laptop ~/prog/perl/test $ time ./threads_benchmark.pl
real 0m36.666s
user 1m11.872s
sys 0m0.172s
(resident memory 6.3MB)
#!/usr/bin/perl -w
use threads;
use threads::shared;
#use forks;
#use forks::shared;
#use Storable;
share $T_maxthreads;
$T_maxthreads = 10;
share $T_maxpending;
$T_maxpending = 20;
share $T_threads;
$T_threads = 0;
share $T_freethreads;
$T_freethreads = 0;
share $T_workingthreads;
$T_workingthreads = 0;
share $T_threadstart;
share @T_pool;
share $T_poollock;
share $T_poolcount;
$T_poolcount = 0;
share $T_morework;
share %T_results;
share $T_resultslock;
share @T_freeresultids;
share $T_freeresultidslock;
share $T_nextresultid;
$T_nextresultid = 1;
share $T_shutdown;
$T_shutdown = 0;
## Inititializes the threadpool
## args: (named)
## -maxthreads: maximum number of threads (default:10)
## -maxpending: How many jobs may be enqueued, if you try to enqeue mo
+re jobs enqueue will block until a job has been done (default:20)
## -startthreads: Threads to start on startup (default:5)
sub threadpool_init{
my %args = @_;
if ( defined( $args{maxthreads} ) ){
$T_maxthreads = $args{maxthreads};
}
if ( defined( $args{maxpending} ) ){
$T_maxpending = $args{maxpending};
}
if (! defined( $args{startthreads} ) ){
$args{startthreads} = 5;
}
if ( $T_maxthreads < $args{startthreads} ){
$args{startthreads} = $T_maxthreads;
}
lock $T_threadstart;
for ( 1.. $args{startthreads}){
my $t = threads->create("T_thread");
$t->detach();
}
my $threads;
do { # Wait until all threads have been started and are waitin
+g for jobs
cond_wait $T_threadstart;
{
lock $T_threads;
$threads = $T_threads;
}
} while ( $threads < $args{startthreads} );
}
## Waits for all threads to finish their jobs and ends them
## sleeps for 1 second after doing his job, in the hope, that all thre
+ads will have cleaned up.
## Is, however, just for the cosmetic of not beeing warned that thread
+s were running while exiting the script.
sub threadpool_shutdown{
threadpool_wait();
print "thr_waiting\n";
{
lock $T_shutdown;
$T_shutdown = 1;
}
my $t;
do {
{
lock $T_morework;
cond_broadcast $T_morework;
}
print "loop\n";
#{
lock $T_threads;
$t = $T_threads;
#}
if ( $t > 0 ){
print "waiting, threads: $T_threads\n";
select undef,undef,undef,0.1;
#cond_wait $T_threads;
}
} while ( $t > 0 );
select undef,undef,undef,0.25;
}
# A worker thread
sub T_thread{
my $init = 1;
my $tn;
{
lock $T_threads;
$T_threads++;
$tn = $T_threads;
}
while ( 1 ){
{
lock $T_freethreads;
$T_freethreads++;
}
my $job;
{
do {
lock $T_morework;
#$dolock = 1;
{
lock $T_poollock;
$job = shift @T_pool;
}
if ( !defined( $job )){
if ( $init ){
lock $T_threadstart;
cond_signal $T_threadstart;
$init = 0;
}
#print "morework\n";
cond_wait $T_morework;
{
#print "lock shutdown\n";
lock $T_shutdown;
if ( $T_shutdown ){
#print "shutting down.
+\n";
lock $T_threads;
$T_threads --;
#cond_signal $T_thread
+s;
#print "thread exit\n"
+;
return;
}
}
}
} while ( !defined( $job ) );
lock $T_poolcount;
$T_poolcount --;
cond_signal $T_poolcount;
}
# Test if there are still enough freethreads..
{
lock $T_freethreads;
$T_freethreads--;
#print "thread: freethreads $T_freethreads\n";
if ( $T_freethreads == 0 ){ # No threads left
+for the work
lock $T_maxthreads;
lock $T_threads;
if ( $T_maxthreads > $T_threads ){
my $thread = threads->create("
+T_thread");
$thread->detach();
}
}
}
{
lock $T_workingthreads;
$T_workingthreads++;
}
my $result = &{$job->{function}}(@{$job->{args}},"\n",
+$tn);
if ( $job->{result} ){
#share $result;
lock $T_resultslock;
my $r = $T_results{$job->{resultid}};
lock $r;
#my $r = $T_results{$job->{resultid}};
$r->{state} = 1;
$r->{result} = $result;
cond_signal $r;
}
{
lock $T_workingthreads;
$T_workingthreads--;
cond_signal $T_workingthreads;
}
}
}
## returns the result of the supplied resultid,
## waits until the result is existing.
## Removes the result from the stack, may be callen therefore only onc
+e for every resultid
## returns undef if there is no such resultid.
sub threadpool_waitforresult{
my $resultid = shift;
my $r;
{
lock $T_resultslock;
$r = $T_results{$resultid};
}
if ( !defined($r)){
return;
}
my $result;
{
lock $r;
while ( $r->{state} == 0 ){ # Wait for the result
cond_wait $r;
}
$result = $r->{result};
lock $T_resultslock;
delete $T_results{$resultid};
lock $T_freeresultidslock;
push @T_freeresultids, $resultid;
}
return $result;
}
## Enqueues a new job.
## args:
## The function name, which will be callen in the current context
## Args to be supplied to the function
sub enqueue{
&T_enqueue( shift, 0, @_ );
}
## Enqueues a new job, and returns a resultid, which can be used to ge
+t the result of the function via threadpool_waitforresult or threadpo
+ol_getresult
## args:
## The function name, which will be callen in the current context
## Args to be supplied to the function
sub renqueue{
return &T_enqueue( shift, 1, @_ );
}
sub T_enqueue{
my $function = shift;
my $result = shift;
share my @args;
@args = @_;
my %hash;
share %hash;
share $function;
$hash{function} = $function;
$hash{args} = \@args;
share $result;
$hash{result} = $result;
my $resultid = 0;
if ( $result > 0 ){ # Want the result saved
{
lock $T_freeresultidslock;
$resultid = shift @T_freeresultids;
}
if ( !defined( $resultid ) ){
{
lock $T_nextresultid;
$resultid = $T_nextresultid;
$T_nextresultid ++;
}
}
share $resultid;
$hash{resultid} = $resultid;
share my %h;
$h{state} = 0; # No result yet ...
{
lock $T_resultslock;
$T_results{$resultid} = \%h;
}
}
#print "enqueued: ", @{$hash{args}},"\n","@args","\n";
lock $T_poolcount;
{
lock $T_morework;
lock $T_poollock;
push @T_pool, \%hash;
cond_signal $T_morework;
}
$T_poolcount ++;
if ( $T_poolcount > $T_maxpending ){
print "Waiting, poolcount: $T_poolcount\n";
cond_wait $T_poolcount;
print "Waited, poolcount: $T_poolcount\n";
}
return $resultid;
}
## Returns the current number of working threads
sub threadpool_threadsworking{
lock $T_workingthreads;
return $T_workingthreads;
}
## blocks until all jobs are done
sub threadpool_wait{
while ( 1 ){
{
lock $T_workingthreads;
my $pending;
{
lock $T_poolcount;
$pending = $T_poolcount;
}
if ( $T_workingthreads + $pending > 0 ){
cond_wait $T_workingthreads;
} else {
return;
}
}
}
}
# TEST CODE ##########################################################
+#############################
sub function{
my @args = @_;
my $a = $args[0];
my $b = 0;
for ( 1..($a*1000) ){
$b++;
$b = $b/10;
$b++;
$b *= 10;
$b = $b / ($a+1);
$b = $b /1.2334645897;
$b = $b * 234.2312125;
$b = $b/$a;
$b++;
$b = $b * $a;
}
return ( $a + 100 );
}
print "Init.\n";
threadpool_init(
startthreads=>10,
maxthreads=>10,
maxpending=>300
);
for my $a ( 1..300 ){
my $r = enqueue( "function", $a );
}
#sleep 2;
print "Waiting now, threads working: ",threadpool_threadsworking,"\n";
#threadpool_wait();
threadpool_shutdown();
print "Exiting now, threads working: ",threadpool_threadsworking,"\n";
And, creating and destroying 300 threads, 10concurrent threads
micha@laptop ~/prog/perl/test $ time ./threadpool.pl
real 0m33.001s
user 1m4.728s
sys 0m0.024s
(max res memory indefinite rising, 10MB)
#!/usr/bin/perl -w
use threads;
sub function{
my @args = @_;
#print "function\n",@args,"\n";
#sleep int(rand(4));
#sleep 1;
#select( undef,undef,undef,0.25 );
my $a = $args[0];
my $b = 0;
for ( 1..($a*1000) ){
$b++;
$b = $b/10;
$b++;
$b *= 10;
$b = $b / ($a+1);
$b = $b /1.2334645897;
$b = $b * 234.2312125;
$b = $b/$a;
$b++;
$b = $b * $a;
}
return ( $a + 100 );
}
my @threads;
for my $a ( 1..10 ){
push @threads, threads->create( "function", $a );
}
for my $a ( 10..300 ){
push @threads, threads->create( "function", $a );
my $t = shift @threads;
$t->join();
}
$_->join for @threads;
Ok, my threadpool implementation is far from beeing perfect, but I had guessed the differences in speed would be more obvious.
Btw, I benchmarked also Thread::Pool
Thread::Pool
micha@laptop ~/prog/perl/test $ time ./thread_pool_benchmark.pl
real 0m36.105s
user 1m9.780s
sys 0m0.048s
15M res memory
#!/usr/bin/perl -w
use Thread::Pool;
sub function{
my @args = @_;
#print "function\n",@args,"\n";
#sleep int(rand(4));
#sleep 1;
#select( undef,undef,undef,0.25 );
my $a = $args[0];
my $b = 0;
for ( 1..($a*1000) ){
$b++;
$b = $b/10;
$b++;
$b *= 10;
$b = $b / ($a+1);
$b = $b /1.2334645897;
$b = $b * 234.2312125;
$b = $b/$a;
$b++;
$b = $b * $a;
}
return ( $a + 100 );
}
my $p = Thread::Pool->new(
{optimize => 'cpu', # default: memo
+ry
do => \&function, # must have
workers => 10}
);
for my $a ( 1..300 ){
$p->job( $a );
}
$p->shutdown();
My thread pool implementation (the first benchmark and code listed) seems to be stable and not too bad,
I believe there are already some advantages to Thread::Pool.
Is there a proposed way to publish such code, perhaps in Snippets ?
Update: Quoted the scripts in readmore tags, as suggested,
I'm also going to publish my ongoing work in
my scratchpad
Comments welcome