package threadpool;
# A test implementation of a pool of threads,
# Storing the results of the threads' jobs in a queue
# this is work in progress, please comment
# Usage:
# use threadpool;
# my $pool = threadpool::new( OPTIONS )
# $pool->enqueue("main::functiontorun", @arguments_for_the_function );
# my $resultid = $pool->renqueue("main::functiontorun", @arguments_for
+_the_function );
# print $pool->waitforresult( $resutid );
# $pool->wait(); # Waits until all jobs are done
# $pool->shutdown();
use threads;
use threads::shared;
#use forks;
#use forks::shared;
#use Storable;
## Inititializes the threadpool
## args: (named)
## -maxthreads: maximum number of threads (default:10)
## -maxpending: How many jobs may be enqueued, if you try to enqeue mo
+re jobs enqueue will block until a job has been done (default:20)
## -startthreads: Threads to start on startup (default:5)
sub new{
my %args = @_;
my $self = {};
bless $self;
share %{$self};
foreach my $k ( qw/maxthreads maxpending threads freethreads w
+orkingthreads threadstart poollock poolcount morework freeresultidslo
+ck nextresultid shutdown resultslock/ ){
share my $s;
$self->{$k} = \$s;
}
${$self->{maxthreads}} = 10;
${$self->{maxpending}} = 20;
${$self->{threads}} = 0;
${$self->{freethreads}} = 0;
${$self->{workingthreads}} = 0;
${$self->{nextresultid}} = 1;
${$self->{shutdown}} = 0;
${$self->{poolcount}} = 0;
share my @pool;
$self->{pool} = \@pool;
share my %results;
$self->{results} = \%results;
share my @freeresultids;
$self->{freeresultids} = \@freeresultids;
if ( defined( $args{maxthreads} ) ){
${$self->{maxthreads}} = $args{maxthreads};
}
if ( defined( $args{maxpending} ) ){
${$self->{maxpending}} = $args{maxpending};
}
if (! defined( $args{startthreads} ) ){
$args{startthreads} = 5;
}
if ( ${$self->{maxthreads}} < $args{startthreads} ){
$args{startthreads} = ${$self->{maxthreads}};
}
lock ${$self->{threadstart}};
for ( 1.. $args{startthreads}){
my $t = threads->create("T_thread", $self);
$t->detach();
}
my $threads;
do { # Wait until all threads have been started and are waitin
+g for jobs
cond_wait ${$self->{threadstart}};
{
lock ${$self->{threads}};
$threads = ${$self->{threads}};
}
} while ( $threads < $args{startthreads} );
return $self;
}
## Waits for all threads to finish their jobs and ends them
## sleeps for 1 second after doing his job, in the hope, that all thre
+ads will have cleaned up.
## Is, however, just for the cosmetic of not beeing warned that thread
+s were running while exiting the script.
sub shutdown{
my $self = shift;
$self->wait();
print "thr_waiting\n";
{
lock ${$self->{shutdown}};
${$self->{shutdown}} = 1;
}
my $t;
do {
{
lock ${$self->{morework}};
cond_broadcast ${$self->{morework}};
}
print "loop\n";
#{
lock ${$self->{threads}};
$t = ${$self->{threads}};
#}
if ( $t > 0 ){
print "waiting, threads: ${$self->{threads}}\n
+";
select undef,undef,undef,0.1;
#cond_wait ${$self->{threads}};
}
} while ( $t > 0 );
select undef,undef,undef,0.25;
}
# A worker thread
sub T_thread{
my $self = shift;
my $init = 1;
my $tn;
{
lock ${$self->{threads}};
${$self->{threads}}++;
$tn = ${$self->{threads}};
}
print "Thread number $tn started.\n";
while ( 1 ){
{
lock ${$self->{freethreads}};
${$self->{freethreads}}++;
}
my $job;
{
do {
lock ${$self->{morework}};
#$dolock = 1;
{
lock ${$self->{poollock}};
$job = shift @{$self->{pool}};
}
if ( !defined( $job )){
if ( $init ){
lock ${$self->{threadstart}};
cond_signal ${$self->{threadst
+art}};
$init = 0;
}
#print "morework\n";
cond_wait ${$self->{morework}};
{
#print "lock shutdown\n";
lock ${$self->{shutdown}};
if ( ${$self->{shutdown}} ){
#print "shutting down.
+\n";
lock ${$self->{threads
+}};
${$self->{threads}} --
+;
#cond_signal ${$self->
+{threads}};
#print "thread exit\n"
+;
return;
}
}
}
} while ( !defined( $job ) );
lock ${$self->{poolcount}};
${$self->{poolcount}} --;
cond_signal ${$self->{poolcount}};
}
# Test if there are still enough freethreads..
{
lock ${$self->{freethreads}};
${$self->{freethreads}}--;
#print "thread: freethreads ${$self->{freethre
+ads}}\n";
if ( ${$self->{freethreads}} == 0 ){ # No thre
+ads left for the work
lock ${$self->{maxthreads}};
lock ${$self->{threads}};
if ( ${$self->{maxthreads}} > ${$self-
+>{threads}} ){
my $thread = threads->create("
+T_thread", $self);
$thread->detach();
}
}
}
{
lock ${$self->{workingthreads}};
${$self->{workingthreads}}++;
}
my $result = &{$job->{function}}(@{$job->{args}},"\n",
+$tn);
if ( $job->{result} ){
#share $result;
lock ${$self->{resultslock}};
my $r = $self->{results}->{$job->{resultid}};
lock $r;
#my $r = $T_results{$job->{resultid}};
$r->{state} = 1;
$r->{result} = $result;
cond_signal $r;
}
{
lock ${$self->{workingthreads}};
${$self->{workingthreads}}--;
cond_signal ${$self->{workingthreads}};
}
}
}
## returns the result of the supplied resultid,
## waits until the result is existing.
## returns undef if there is no such resultid.
sub waitforresult{
my $self = shift;
my $resultid = shift;
my $r;
{
lock ${$self->{resultslock}};
$r = $self->{results}->{$resultid};
}
if ( !defined($r)){
return;
}
my $result;
{
lock $r;
while ( $r->{state} == 0 ){ # Wait for the result
cond_wait $r;
}
$result = $r->{result};
lock ${$self->{resultslock}};
delete $self->{results}->{$resultid};
lock ${$self->{freeresultidslock}};
push @{$self->{freeresultids}}, $resultid;
}
return $result;
}
## Enqueues a new job.
## args:
## The function name, which will be callen in the current context
## Args to be supplied to the function
sub enqueue{
my $self = shift;
$self->T_enqueue( shift, 0, @_ );
}
## Enqueues a new job, and returns a resultid, which can be used to ge
+t the result of the function via threadpool_waitforresult or threadpo
+ol_getresult
## args:
## The function name, which will be callen in the current context
## Args to be supplied to the function
sub renqueue{
my $self = shift;
return $self->T_enqueue( shift, 1, @_ );
}
sub T_enqueue{
my $self = shift;
my $function = shift;
my $result = shift;
share my @args;
@args = @_;
my %hash;
share %hash;
share $function;
$hash{function} = $function;
$hash{args} = \@args;
share $result;
$hash{result} = $result;
my $resultid = 0;
if ( $result > 0 ){ # Want the result saved
{
lock ${$self->{freeresultidslock}};
$resultid = shift @{$self->{freeresultids}};
}
if ( !defined( $resultid ) ){
{
lock ${$self->{nextresultid}};
$resultid = ${$self->{nextresultid}};
${$self->{nextresultid}} ++;
}
}
share $resultid;
$hash{resultid} = $resultid;
share my %h;
$h{state} = 0; # No result yet ...
{
lock ${$self->{resultslock}};
$self->{results}->{$resultid} = \%h;
}
}
#print "enqueued: ", @{$hash{args}},"\n","@args","\n";
lock ${$self->{poolcount}};
{
lock ${$self->{morework}};
lock ${$self->{poollock}};
push @{$self->{pool}}, \%hash;
cond_signal ${$self->{morework}};
}
${$self->{poolcount}} ++;
if ( ${$self->{poolcount}} > ${$self->{maxpending}} ){
print "Waiting, poolcount: ${$self->{poolcount}}\n";
cond_wait ${$self->{poolcount}};
print "Waited, poolcount: ${$self->{poolcount}}\n";
}
return $resultid;
}
## Returns the current number of working threads
sub threadsworking{
my $self = shift;
lock ${$self->{workingthreads}};
return ${$self->{workingthreads}};
}
## blocks until all jobs are done
sub wait{
my $self = shift;
while ( 1 ){
{
lock ${$self->{workingthreads}};
my $pending;
{
lock ${$self->{poolcount}};
$pending = ${$self->{poolcount}};
}
if ( ${$self->{workingthreads}} + $pending > 0
+ ){
cond_wait ${$self->{workingthreads}};
} else {
return;
}
}
}
}
# TEST CODE ##########################################################
+#############################
1;