Beefy Boxes and Bandwidth Generously Provided by pair Networks
The stupid question is the question not asked
 
PerlMonks  

misc's scratchpad

by misc (Pilgrim)
on Jul 10, 2007 at 14:13 UTC ( #625833=scratchpad: print w/ replies, xml ) Need Help??

package threadpool; # A test implementation of a pool of threads, # Storing the results of the threads' jobs in a queue # this is work in progress, please comment # Usage: # use threadpool; # my $pool = threadpool::new( OPTIONS ) # $pool->enqueue("main::functiontorun", @arguments_for_the_function ); # my $resultid = $pool->renqueue("main::functiontorun", @arguments_for +_the_function ); # print $pool->waitforresult( $resutid ); # $pool->wait(); # Waits until all jobs are done # $pool->shutdown(); use threads; use threads::shared; #use forks; #use forks::shared; #use Storable; ## Inititializes the threadpool ## args: (named) ## -maxthreads: maximum number of threads (default:10) ## -maxpending: How many jobs may be enqueued, if you try to enqeue mo +re jobs enqueue will block until a job has been done (default:20) ## -startthreads: Threads to start on startup (default:5) sub new{ my %args = @_; my $self = {}; bless $self; share %{$self}; foreach my $k ( qw/maxthreads maxpending threads freethreads w +orkingthreads threadstart poollock poolcount morework freeresultidslo +ck nextresultid shutdown resultslock/ ){ share my $s; $self->{$k} = \$s; } ${$self->{maxthreads}} = 10; ${$self->{maxpending}} = 20; ${$self->{threads}} = 0; ${$self->{freethreads}} = 0; ${$self->{workingthreads}} = 0; ${$self->{nextresultid}} = 1; ${$self->{shutdown}} = 0; ${$self->{poolcount}} = 0; share my @pool; $self->{pool} = \@pool; share my %results; $self->{results} = \%results; share my @freeresultids; $self->{freeresultids} = \@freeresultids; if ( defined( $args{maxthreads} ) ){ ${$self->{maxthreads}} = $args{maxthreads}; } if ( defined( $args{maxpending} ) ){ ${$self->{maxpending}} = $args{maxpending}; } if (! defined( $args{startthreads} ) ){ $args{startthreads} = 5; } if ( ${$self->{maxthreads}} < $args{startthreads} ){ $args{startthreads} = ${$self->{maxthreads}}; } lock ${$self->{threadstart}}; for ( 1.. $args{startthreads}){ my $t = threads->create("T_thread", $self); $t->detach(); } my $threads; do { # Wait until all threads have been started and are waitin +g for jobs cond_wait ${$self->{threadstart}}; { lock ${$self->{threads}}; $threads = ${$self->{threads}}; } } while ( $threads < $args{startthreads} ); return $self; } ## Waits for all threads to finish their jobs and ends them ## sleeps for 1 second after doing his job, in the hope, that all thre +ads will have cleaned up. ## Is, however, just for the cosmetic of not beeing warned that thread +s were running while exiting the script. sub shutdown{ my $self = shift; $self->wait(); print "thr_waiting\n"; { lock ${$self->{shutdown}}; ${$self->{shutdown}} = 1; } my $t; do { { lock ${$self->{morework}}; cond_broadcast ${$self->{morework}}; } print "loop\n"; #{ lock ${$self->{threads}}; $t = ${$self->{threads}}; #} if ( $t > 0 ){ print "waiting, threads: ${$self->{threads}}\n +"; select undef,undef,undef,0.1; #cond_wait ${$self->{threads}}; } } while ( $t > 0 ); select undef,undef,undef,0.25; } # A worker thread sub T_thread{ my $self = shift; my $init = 1; my $tn; { lock ${$self->{threads}}; ${$self->{threads}}++; $tn = ${$self->{threads}}; } print "Thread number $tn started.\n"; while ( 1 ){ { lock ${$self->{freethreads}}; ${$self->{freethreads}}++; } my $job; { do { lock ${$self->{morework}}; #$dolock = 1; { lock ${$self->{poollock}}; $job = shift @{$self->{pool}}; } if ( !defined( $job )){ if ( $init ){ lock ${$self->{threadstart}}; cond_signal ${$self->{threadst +art}}; $init = 0; } #print "morework\n"; cond_wait ${$self->{morework}}; { #print "lock shutdown\n"; lock ${$self->{shutdown}}; if ( ${$self->{shutdown}} ){ #print "shutting down. +\n"; lock ${$self->{threads +}}; ${$self->{threads}} -- +; #cond_signal ${$self-> +{threads}}; #print "thread exit\n" +; return; } } } } while ( !defined( $job ) ); lock ${$self->{poolcount}}; ${$self->{poolcount}} --; cond_signal ${$self->{poolcount}}; } # Test if there are still enough freethreads.. { lock ${$self->{freethreads}}; ${$self->{freethreads}}--; #print "thread: freethreads ${$self->{freethre +ads}}\n"; if ( ${$self->{freethreads}} == 0 ){ # No thre +ads left for the work lock ${$self->{maxthreads}}; lock ${$self->{threads}}; if ( ${$self->{maxthreads}} > ${$self- +>{threads}} ){ my $thread = threads->create(" +T_thread", $self); $thread->detach(); } } } { lock ${$self->{workingthreads}}; ${$self->{workingthreads}}++; } my $result = &{$job->{function}}(@{$job->{args}},"\n", +$tn); if ( $job->{result} ){ #share $result; lock ${$self->{resultslock}}; my $r = $self->{results}->{$job->{resultid}}; lock $r; #my $r = $T_results{$job->{resultid}}; $r->{state} = 1; $r->{result} = $result; cond_signal $r; } { lock ${$self->{workingthreads}}; ${$self->{workingthreads}}--; cond_signal ${$self->{workingthreads}}; } } } ## returns the result of the supplied resultid, ## waits until the result is existing. ## returns undef if there is no such resultid. sub waitforresult{ my $self = shift; my $resultid = shift; my $r; { lock ${$self->{resultslock}}; $r = $self->{results}->{$resultid}; } if ( !defined($r)){ return; } my $result; { lock $r; while ( $r->{state} == 0 ){ # Wait for the result cond_wait $r; } $result = $r->{result}; lock ${$self->{resultslock}}; delete $self->{results}->{$resultid}; lock ${$self->{freeresultidslock}}; push @{$self->{freeresultids}}, $resultid; } return $result; } ## Enqueues a new job. ## args: ## The function name, which will be callen in the current context ## Args to be supplied to the function sub enqueue{ my $self = shift; $self->T_enqueue( shift, 0, @_ ); } ## Enqueues a new job, and returns a resultid, which can be used to ge +t the result of the function via threadpool_waitforresult or threadpo +ol_getresult ## args: ## The function name, which will be callen in the current context ## Args to be supplied to the function sub renqueue{ my $self = shift; return $self->T_enqueue( shift, 1, @_ ); } sub T_enqueue{ my $self = shift; my $function = shift; my $result = shift; share my @args; @args = @_; my %hash; share %hash; share $function; $hash{function} = $function; $hash{args} = \@args; share $result; $hash{result} = $result; my $resultid = 0; if ( $result > 0 ){ # Want the result saved { lock ${$self->{freeresultidslock}}; $resultid = shift @{$self->{freeresultids}}; } if ( !defined( $resultid ) ){ { lock ${$self->{nextresultid}}; $resultid = ${$self->{nextresultid}}; ${$self->{nextresultid}} ++; } } share $resultid; $hash{resultid} = $resultid; share my %h; $h{state} = 0; # No result yet ... { lock ${$self->{resultslock}}; $self->{results}->{$resultid} = \%h; } } #print "enqueued: ", @{$hash{args}},"\n","@args","\n"; lock ${$self->{poolcount}}; { lock ${$self->{morework}}; lock ${$self->{poollock}}; push @{$self->{pool}}, \%hash; cond_signal ${$self->{morework}}; } ${$self->{poolcount}} ++; if ( ${$self->{poolcount}} > ${$self->{maxpending}} ){ print "Waiting, poolcount: ${$self->{poolcount}}\n"; cond_wait ${$self->{poolcount}}; print "Waited, poolcount: ${$self->{poolcount}}\n"; } return $resultid; } ## Returns the current number of working threads sub threadsworking{ my $self = shift; lock ${$self->{workingthreads}}; return ${$self->{workingthreads}}; } ## blocks until all jobs are done sub wait{ my $self = shift; while ( 1 ){ { lock ${$self->{workingthreads}}; my $pending; { lock ${$self->{poolcount}}; $pending = ${$self->{poolcount}}; } if ( ${$self->{workingthreads}} + $pending > 0 + ){ cond_wait ${$self->{workingthreads}}; } else { return; } } } } # TEST CODE ########################################################## +############################# 1;
Log In?
Username:
Password:

What's my password?
Create A New User
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others exploiting the Monastery: (6)
As of 2014-10-01 18:20 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    What is your favourite meta-syntactic variable name?














    Results (32 votes), past polls