in reply to Threads and LWP::UserAgent question
Endangering myself getting strangled..
Some time ago I wrote my own threadpool implementation..
I was curious whether my implementation has the same problems..
I cannot tell why, but the version below works as expected,
while some mysterious things (I also didn't look into the sources) seem to happen in Thread::Pool.
I also added a small delay between the requests.
michael
Some time ago I wrote my own threadpool implementation..
I was curious whether my implementation has the same problems..
I cannot tell why, but the version below works as expected,
while some mysterious things (I also didn't look into the sources) seem to happen in Thread::Pool.
I also added a small delay between the requests.
#!/usr/bin/perl -w #use strict; package threadpool; # A test implementation of a pool of threads, # Storing the results of the threads' jobs in a queue # this is work in progress, please comment # Usage: my $pool = threadpool::new( OPTIONS ) # $pool->enqueue("main::functiontorun", @arguments_for_the_function ); # my $resultid = $pool->renqueue("main::functiontorun", @arguments_for +_the_function ); # print $pool->waitforresult( $resultid ); # $pool->wait(); # Waits until all jobs are done # $pool->shutdown(); # Destroys all threads use threads; use threads::shared; #use forks; #use forks::shared; #use Storable; ## Inititializes the threadpool ## args: (named) ## -maxthreads: maximum number of threads (default:10) ## -maxpending: How many jobs may be enqueued, if you try to enqeue mo +re jobs enqueue will block until a job has been done (default:20) ## -startthreads: Threads to start on startup (default:5) sub new{ my %args = @_; my $self = {}; bless $self; share %{$self}; foreach my $k ( qw/maxthreads maxpending threads freethreads w +orkingthreads threadstart poollock poolcount morework freeresultidslo +ck nextresultid shutdown resultslock/ ){ share my $s; $self->{$k} = \$s; } ${$self->{maxthreads}} = 10; ${$self->{maxpending}} = 20; ${$self->{threads}} = 0; ${$self->{freethreads}} = 0; ${$self->{workingthreads}} = 0; ${$self->{nextresultid}} = 1; ${$self->{shutdown}} = 0; ${$self->{poolcount}} = 0; share my @pool; $self->{pool} = \@pool; share my %results; $self->{results} = \%results; share my @freeresultids; $self->{freeresultids} = \@freeresultids; if ( defined( $args{maxthreads} ) ){ ${$self->{maxthreads}} = $args{maxthreads}; } if ( defined( $args{maxpending} ) ){ ${$self->{maxpending}} = $args{maxpending}; } if (! defined( $args{startthreads} ) ){ $args{startthreads} = 5; } if ( ${$self->{maxthreads}} < $args{startthreads} ){ $args{startthreads} = ${$self->{maxthreads}}; } lock ${$self->{threadstart}}; for ( 1.. $args{startthreads}){ my $t = threads->create("T_thread", $self, 1); $t->detach(); } my $threads; do { # Wait until all threads have been started and are waitin +g for jobs cond_wait ${$self->{threadstart}}; { lock ${$self->{threads}}; $threads = ${$self->{threads}}; } } while ( $threads < $args{startthreads} ); return $self; } ## Waits for all threads to finish their jobs and ends them ## sleeps for 1 second after doing his job, in the hope, that all thre +ads will have cleaned up. ## Is, however, just for the cosmetic of not beeing warned that thread +s were running while exiting the script. sub shutdown{ my $self = shift; $self->wait(); print "thr_waiting\n"; { lock ${$self->{shutdown}}; ${$self->{shutdown}} = 1; } my $t; do { { lock ${$self->{morework}}; cond_broadcast ${$self->{morework}}; } print "loop\n"; #{ lock ${$self->{threads}}; $t = ${$self->{threads}}; #} if ( $t > 0 ){ print "waiting, threads: ${$self->{threads}}\n +"; select undef,undef,undef,0.1; #cond_wait ${$self->{threads}}; } } while ( $t > 0 ); select undef,undef,undef,0.25; } # A worker thread sub T_thread{ my $self = shift; my $count = shift; my $init = 1; my $tn; { lock ${$self->{threads}}; $tn = ${$self->{threads}}; } if ( $count ){ lock ${$self->{threads}}; ${$self->{threads}}++; $tn = ${$self->{threads}}; } print "Thread number $tn started.\n"; while ( 1 ){ { lock ${$self->{freethreads}}; ${$self->{freethreads}}++; } my $job; { do { lock ${$self->{morework}}; #$dolock = 1; { lock ${$self->{poollock}}; $job = shift @{$self->{pool}}; } if ( !defined( $job )){ if ( $init ){ lock ${$self->{threadstart}}; cond_signal ${$self->{threadst +art}}; $init = 0; } #print "morework\n"; threads->yield(); cond_wait ${$self->{morework}}; { #print "lock shutdown\n"; lock ${$self->{shutdown}}; if ( ${$self->{shutdown}} ){ #print "shutting down. +\n"; lock ${$self->{freethr +eads}}; lock ${$self->{threads +}}; ${$self->{freethreads} +} --; ${$self->{threads}} -- +; #cond_signal ${$self-> +{threads}}; #print "thread exit\n" +; return; } } } } while ( !defined( $job ) ); lock ${$self->{poolcount}}; + lock ${$self->{freethreads}}; ${$self->{freethreads}}--; ${$self->{poolcount}} --; cond_signal ${$self->{poolcount}}; } # Test if there are still enough freethreads.. { lock ${$self->{freethreads}}; # ${$self->{freethreads}}--; #print "thread: freethreads ${$self->{freethre +ads}}\n"; if ( ${$self->{freethreads}} == 0 ){ # No thre +ads left for the work lock ${$self->{maxthreads}}; lock ${$self->{threads}}; if ( ${$self->{maxthreads}} > ${$self- +>{threads}} ){ lock ${$self->{threads}}; ${$self->{threads}}++; my $thread = threads->create(" +T_thread", $self, 0); $thread->detach(); } } } { lock ${$self->{workingthreads}}; ${$self->{workingthreads}}++; } my $result = &{$job->{function}}(@{$job->{args}},"\n", +$tn); if ( $job->{result} ){ #share $result; lock ${$self->{resultslock}}; my $r = $self->{results}->{$job->{resultid}}; lock $r; #my $r = $T_results{$job->{resultid}}; $r->{state} = 1; $r->{result} = $result; cond_signal $r; } { lock ${$self->{workingthreads}}; ${$self->{workingthreads}}--; cond_signal ${$self->{workingthreads}}; } } } ## returns the result of the supplied resultid, ## waits until the result is there. ## returns undef if there is no such resultid. sub waitforresult{ my $self = shift; my $resultid = shift; my $r; { lock ${$self->{resultslock}}; $r = $self->{results}->{$resultid}; } if ( !defined($r)){ return; # No such resultid } my $result; { lock $r; while ( $r->{state} == 0 ){ # Wait for the result cond_wait $r; } $result = $r->{result}; lock ${$self->{resultslock}}; delete $self->{results}->{$resultid}; lock ${$self->{freeresultidslock}}; push @{$self->{freeresultids}}, $resultid; } return $result; } ## Enqueues a new job. ## args: ## The function name, which will be callen in the current context e.g. + "main::function" ## Args to be supplied to the function sub enqueue{ my $self = shift; $self->T_enqueue( shift, 0, @_ ); } ## Enqueues a new job, and returns a resultid, which can be used to ge +t the result of the function via threadpool_waitforresult or threadpo +ol_getresult ## args: ## The function name, which will be callen in the current context. e.g +. "main::function" ## Args to be supplied to the function sub renqueue{ my $self = shift; return $self->T_enqueue( shift, 1, @_ ); } sub T_enqueue{ my $self = shift; my $function = shift; my $result = shift; share my @args; @args = @_; my %hash; share %hash; share $function; $hash{function} = $function; $hash{args} = \@args; share $result; $hash{result} = $result; my $resultid = 0; if ( $result > 0 ){ # Want the result saved { lock ${$self->{freeresultidslock}}; $resultid = shift @{$self->{freeresultids}}; } if ( !defined( $resultid ) ){ { lock ${$self->{nextresultid}}; $resultid = ${$self->{nextresultid}}; ${$self->{nextresultid}} ++; } } share $resultid; $hash{resultid} = $resultid; share my %h; $h{state} = 0; # No result yet ... { lock ${$self->{resultslock}}; $self->{results}->{$resultid} = \%h; } } #print "enqueued: ", @{$hash{args}},"\n","@args","\n"; lock ${$self->{poolcount}}; { lock ${$self->{morework}}; lock ${$self->{poollock}}; push @{$self->{pool}}, \%hash; cond_signal ${$self->{morework}}; } ${$self->{poolcount}} ++; if ( ${$self->{poolcount}} > ${$self->{maxpending}} ){ print "Waiting, poolcount: ${$self->{poolcount}}\n"; cond_wait ${$self->{poolcount}}; print "Waited, poolcount: ${$self->{poolcount}}\n"; } return $resultid; } ## Returns the current number of working threads ## There's to remark that some threads could be out of your function, ## but still have some work to do within this module sub threadsworking{ my $self = shift; lock ${$self->{workingthreads}}; return ${$self->{workingthreads}}; } ## Returns the current number of jobs, in the queue and in work ## There's to remark that some threads could be out of your function, ## but still have some work to do within this module sub jobs{ my $self = shift; lock ${$self->{poolcount}}; lock ${$self->{freethreads}}; lock ${$self->{threads}}; return ( ${$self->{poolcount}} + ( ${$self->{threads}} - ${$se +lf->{freethreads}} ) ); } ## Returns the number of jobs currently in queue sub pendingjobs{ my $self = shift; lock ${$self->{poolcount}}; return ${$self->{poolcount}}; } ## blocks until all jobs are done sub wait{ my $self = shift; while ( 1 ){ { lock ${$self->{workingthreads}}; my $pending; { lock ${$self->{poolcount}}; $pending = ${$self->{poolcount}}; } if ( ${$self->{workingthreads}} + $pending > 0 + ){ cond_wait ${$self->{workingthreads}}; } else { return; } } } } # TEST CODE ########################################################## +############################# package main; # CPAN modules #use Thread::Pool; #use threadpool; use LWP::UserAgent; use HTML::LinkExtor; use URI::URL; my $p = HTML::LinkExtor->new(\&callback); $pool = threadpool->new(); # Start up some threads $pool->enqueue( 'main::XXX', "http://www.yahoo.com" ); $pool->enqueue( 'main::XXX', "http://www.google.de" ); $pool->wait; # wait for all threads to finish #### subroutines sub XXX { $current_url = @_[0]; @queue = (); while( $current_url ne "" ){ @links = get("$current_url" ); print "links in url\n********************\n"; print join("\n", @links), "\n"; push(@queue,@links); #print "links in queue\n********************\n"; #print join("\n", @queue), "\n"; $current_url = pop(@queue); print ">>>>new url=$current_url\n"; #this is where we go next } } my @links = (); # Set up a callback that collect links sub callback { my($tag, %attr) = @_; return if $tag ne 'a'; # we only look closer at <a ...> push(@links, values %attr); } sub get { $url = "@_[0]"; select undef,undef,undef,0.2; print "Entering html::getlinks, url=$url\n"; my $ua = LWP::UserAgent->new; print "1\n"; $res = $ua->request(HTTP::Request->new(GET => $url), sub {$p->pars +e($_[0])}); print "res: ",$res->content(),"\n"; print "links:",join("\n",@links); $base = $res->base; # Expand all URLs to absolute ones @links = map { $_ = url($_, $base)->abs; } @links; print "links:",join("\n",@links); return @links; } $version = 1; 1;
|
---|
Replies are listed 'Best First'. | |
---|---|
Re^2: Threads and LWP::UserAgent question
by BrowserUk (Patriarch) on Jun 10, 2008 at 03:26 UTC | |
by misc (Friar) on Jun 10, 2008 at 10:49 UTC | |
by gatito (Novice) on Jun 12, 2008 at 16:22 UTC | |
Re^2: Threads and LWP::UserAgent question
by gatito (Novice) on Jun 09, 2008 at 23:59 UTC | |
by misc (Friar) on Jun 10, 2008 at 10:50 UTC |
In Section
Seekers of Perl Wisdom