Beefy Boxes and Bandwidth Generously Provided by pair Networks
P is for Practical

Re: Threads and LWP::UserAgent question

by misc (Pilgrim)
on Jun 09, 2008 at 13:12 UTC ( #691029=note: print w/replies, xml ) Need Help??

in reply to Threads and LWP::UserAgent question

Endangering myself getting strangled..

Some time ago I wrote my own threadpool implementation..
I was curious whether my implementation has the same problems..
I cannot tell why, but the version below works as expected,
while some mysterious things (I also didn't look into the sources) seem to happen in Thread::Pool.

I also added a small delay between the requests.

#!/usr/bin/perl -w #use strict; package threadpool; # A test implementation of a pool of threads, # Storing the results of the threads' jobs in a queue # this is work in progress, please comment # Usage: my $pool = threadpool::new( OPTIONS ) # $pool->enqueue("main::functiontorun", @arguments_for_the_function ); # my $resultid = $pool->renqueue("main::functiontorun", @arguments_for +_the_function ); # print $pool->waitforresult( $resultid ); # $pool->wait(); # Waits until all jobs are done # $pool->shutdown(); # Destroys all threads use threads; use threads::shared; #use forks; #use forks::shared; #use Storable; ## Inititializes the threadpool ## args: (named) ## -maxthreads: maximum number of threads (default:10) ## -maxpending: How many jobs may be enqueued, if you try to enqeue mo +re jobs enqueue will block until a job has been done (default:20) ## -startthreads: Threads to start on startup (default:5) sub new{ my %args = @_; my $self = {}; bless $self; share %{$self}; foreach my $k ( qw/maxthreads maxpending threads freethreads w +orkingthreads threadstart poollock poolcount morework freeresultidslo +ck nextresultid shutdown resultslock/ ){ share my $s; $self->{$k} = \$s; } ${$self->{maxthreads}} = 10; ${$self->{maxpending}} = 20; ${$self->{threads}} = 0; ${$self->{freethreads}} = 0; ${$self->{workingthreads}} = 0; ${$self->{nextresultid}} = 1; ${$self->{shutdown}} = 0; ${$self->{poolcount}} = 0; share my @pool; $self->{pool} = \@pool; share my %results; $self->{results} = \%results; share my @freeresultids; $self->{freeresultids} = \@freeresultids; if ( defined( $args{maxthreads} ) ){ ${$self->{maxthreads}} = $args{maxthreads}; } if ( defined( $args{maxpending} ) ){ ${$self->{maxpending}} = $args{maxpending}; } if (! defined( $args{startthreads} ) ){ $args{startthreads} = 5; } if ( ${$self->{maxthreads}} < $args{startthreads} ){ $args{startthreads} = ${$self->{maxthreads}}; } lock ${$self->{threadstart}}; for ( 1.. $args{startthreads}){ my $t = threads->create("T_thread", $self, 1); $t->detach(); } my $threads; do { # Wait until all threads have been started and are waitin +g for jobs cond_wait ${$self->{threadstart}}; { lock ${$self->{threads}}; $threads = ${$self->{threads}}; } } while ( $threads < $args{startthreads} ); return $self; } ## Waits for all threads to finish their jobs and ends them ## sleeps for 1 second after doing his job, in the hope, that all thre +ads will have cleaned up. ## Is, however, just for the cosmetic of not beeing warned that thread +s were running while exiting the script. sub shutdown{ my $self = shift; $self->wait(); print "thr_waiting\n"; { lock ${$self->{shutdown}}; ${$self->{shutdown}} = 1; } my $t; do { { lock ${$self->{morework}}; cond_broadcast ${$self->{morework}}; } print "loop\n"; #{ lock ${$self->{threads}}; $t = ${$self->{threads}}; #} if ( $t > 0 ){ print "waiting, threads: ${$self->{threads}}\n +"; select undef,undef,undef,0.1; #cond_wait ${$self->{threads}}; } } while ( $t > 0 ); select undef,undef,undef,0.25; } # A worker thread sub T_thread{ my $self = shift; my $count = shift; my $init = 1; my $tn; { lock ${$self->{threads}}; $tn = ${$self->{threads}}; } if ( $count ){ lock ${$self->{threads}}; ${$self->{threads}}++; $tn = ${$self->{threads}}; } print "Thread number $tn started.\n"; while ( 1 ){ { lock ${$self->{freethreads}}; ${$self->{freethreads}}++; } my $job; { do { lock ${$self->{morework}}; #$dolock = 1; { lock ${$self->{poollock}}; $job = shift @{$self->{pool}}; } if ( !defined( $job )){ if ( $init ){ lock ${$self->{threadstart}}; cond_signal ${$self->{threadst +art}}; $init = 0; } #print "morework\n"; threads->yield(); cond_wait ${$self->{morework}}; { #print "lock shutdown\n"; lock ${$self->{shutdown}}; if ( ${$self->{shutdown}} ){ #print "shutting down. +\n"; lock ${$self->{freethr +eads}}; lock ${$self->{threads +}}; ${$self->{freethreads} +} --; ${$self->{threads}} -- +; #cond_signal ${$self-> +{threads}}; #print "thread exit\n" +; return; } } } } while ( !defined( $job ) ); lock ${$self->{poolcount}}; + lock ${$self->{freethreads}}; ${$self->{freethreads}}--; ${$self->{poolcount}} --; cond_signal ${$self->{poolcount}}; } # Test if there are still enough freethreads.. { lock ${$self->{freethreads}}; # ${$self->{freethreads}}--; #print "thread: freethreads ${$self->{freethre +ads}}\n"; if ( ${$self->{freethreads}} == 0 ){ # No thre +ads left for the work lock ${$self->{maxthreads}}; lock ${$self->{threads}}; if ( ${$self->{maxthreads}} > ${$self- +>{threads}} ){ lock ${$self->{threads}}; ${$self->{threads}}++; my $thread = threads->create(" +T_thread", $self, 0); $thread->detach(); } } } { lock ${$self->{workingthreads}}; ${$self->{workingthreads}}++; } my $result = &{$job->{function}}(@{$job->{args}},"\n", +$tn); if ( $job->{result} ){ #share $result; lock ${$self->{resultslock}}; my $r = $self->{results}->{$job->{resultid}}; lock $r; #my $r = $T_results{$job->{resultid}}; $r->{state} = 1; $r->{result} = $result; cond_signal $r; } { lock ${$self->{workingthreads}}; ${$self->{workingthreads}}--; cond_signal ${$self->{workingthreads}}; } } } ## returns the result of the supplied resultid, ## waits until the result is there. ## returns undef if there is no such resultid. sub waitforresult{ my $self = shift; my $resultid = shift; my $r; { lock ${$self->{resultslock}}; $r = $self->{results}->{$resultid}; } if ( !defined($r)){ return; # No such resultid } my $result; { lock $r; while ( $r->{state} == 0 ){ # Wait for the result cond_wait $r; } $result = $r->{result}; lock ${$self->{resultslock}}; delete $self->{results}->{$resultid}; lock ${$self->{freeresultidslock}}; push @{$self->{freeresultids}}, $resultid; } return $result; } ## Enqueues a new job. ## args: ## The function name, which will be callen in the current context e.g. + "main::function" ## Args to be supplied to the function sub enqueue{ my $self = shift; $self->T_enqueue( shift, 0, @_ ); } ## Enqueues a new job, and returns a resultid, which can be used to ge +t the result of the function via threadpool_waitforresult or threadpo +ol_getresult ## args: ## The function name, which will be callen in the current context. e.g +. "main::function" ## Args to be supplied to the function sub renqueue{ my $self = shift; return $self->T_enqueue( shift, 1, @_ ); } sub T_enqueue{ my $self = shift; my $function = shift; my $result = shift; share my @args; @args = @_; my %hash; share %hash; share $function; $hash{function} = $function; $hash{args} = \@args; share $result; $hash{result} = $result; my $resultid = 0; if ( $result > 0 ){ # Want the result saved { lock ${$self->{freeresultidslock}}; $resultid = shift @{$self->{freeresultids}}; } if ( !defined( $resultid ) ){ { lock ${$self->{nextresultid}}; $resultid = ${$self->{nextresultid}}; ${$self->{nextresultid}} ++; } } share $resultid; $hash{resultid} = $resultid; share my %h; $h{state} = 0; # No result yet ... { lock ${$self->{resultslock}}; $self->{results}->{$resultid} = \%h; } } #print "enqueued: ", @{$hash{args}},"\n","@args","\n"; lock ${$self->{poolcount}}; { lock ${$self->{morework}}; lock ${$self->{poollock}}; push @{$self->{pool}}, \%hash; cond_signal ${$self->{morework}}; } ${$self->{poolcount}} ++; if ( ${$self->{poolcount}} > ${$self->{maxpending}} ){ print "Waiting, poolcount: ${$self->{poolcount}}\n"; cond_wait ${$self->{poolcount}}; print "Waited, poolcount: ${$self->{poolcount}}\n"; } return $resultid; } ## Returns the current number of working threads ## There's to remark that some threads could be out of your function, ## but still have some work to do within this module sub threadsworking{ my $self = shift; lock ${$self->{workingthreads}}; return ${$self->{workingthreads}}; } ## Returns the current number of jobs, in the queue and in work ## There's to remark that some threads could be out of your function, ## but still have some work to do within this module sub jobs{ my $self = shift; lock ${$self->{poolcount}}; lock ${$self->{freethreads}}; lock ${$self->{threads}}; return ( ${$self->{poolcount}} + ( ${$self->{threads}} - ${$se +lf->{freethreads}} ) ); } ## Returns the number of jobs currently in queue sub pendingjobs{ my $self = shift; lock ${$self->{poolcount}}; return ${$self->{poolcount}}; } ## blocks until all jobs are done sub wait{ my $self = shift; while ( 1 ){ { lock ${$self->{workingthreads}}; my $pending; { lock ${$self->{poolcount}}; $pending = ${$self->{poolcount}}; } if ( ${$self->{workingthreads}} + $pending > 0 + ){ cond_wait ${$self->{workingthreads}}; } else { return; } } } } # TEST CODE ########################################################## +############################# package main; # CPAN modules #use Thread::Pool; #use threadpool; use LWP::UserAgent; use HTML::LinkExtor; use URI::URL; my $p = HTML::LinkExtor->new(\&callback); $pool = threadpool->new(); # Start up some threads $pool->enqueue( 'main::XXX', "" ); $pool->enqueue( 'main::XXX', "" ); $pool->wait; # wait for all threads to finish #### subroutines sub XXX { $current_url = @_[0]; @queue = (); while( $current_url ne "" ){ @links = get("$current_url" ); print "links in url\n********************\n"; print join("\n", @links), "\n"; push(@queue,@links); #print "links in queue\n********************\n"; #print join("\n", @queue), "\n"; $current_url = pop(@queue); print ">>>>new url=$current_url\n"; #this is where we go next } } my @links = (); # Set up a callback that collect links sub callback { my($tag, %attr) = @_; return if $tag ne 'a'; # we only look closer at <a ...> push(@links, values %attr); } sub get { $url = "@_[0]"; select undef,undef,undef,0.2; print "Entering html::getlinks, url=$url\n"; my $ua = LWP::UserAgent->new; print "1\n"; $res = $ua->request(HTTP::Request->new(GET => $url), sub {$p->pars +e($_[0])}); print "res: ",$res->content(),"\n"; print "links:",join("\n",@links); $base = $res->base; # Expand all URLs to absolute ones @links = map { $_ = url($_, $base)->abs; } @links; print "links:",join("\n",@links); return @links; } $version = 1; 1;

Replies are listed 'Best First'.
Re^2: Threads and LWP::UserAgent question
by BrowserUk (Pope) on Jun 10, 2008 at 03:26 UTC

    Your module certainly contains less puerile cruft (optimise=>cpu/memory which does and has never done anything; the whole Thread::Conveyor 'abstraction to far' and much more), than Thread::Pool.

    Minus all that cruft, your module stands at least a fighting chance of being debuggable (Ie. maintainable).

    It does however exhibit a few um...peculiarities. Like

    while ( 1 ){ { lock ${$self->{working threads}}; my $pending; { lock ${$self->{poolcount}}; $pending = ${$self->{poolcount}}; } if ( ${$self->{working threads}} + $pending > 0 ){ cond_wait ${$self->{workingthreads}}; } else { return; } } }

    I assume that the inner anonymous block is intended to restrict the range of effect of the lock statement, but given there is nothing outside of that anonymous block, within the encompassing while block, it is entirely superfluous.

    In new(), you have ${$self->{poolcount}} = 0;. At other points in the code you have lock ${$self->{poolcount}}; and ${$self->{poolcount}} --; and cond_signal ${$self->{poolcount}}; etc.

    The elements of hashes are scalars. Why are you autovivifying scalar references into those scalars? As best as I can tell, you're just complicating the code and making it run more slowly.

    These and similar constructs are rife throughout the module. You also appear to be doing far more locking than is necessary, but that's instinct rather than careful analysis talking.

    But the main problem with your modules is that (on the basis of inspection at least), it appears to have one major, fundamental flaw in common with Thread::Pool. It completely misses the point of "pooling threads". Ithreads are relatively expensive to both start and destroy--a side effect of emulation forking. Badly.

    The benefit of using a "pool of iThreads", is that you avoid those overheads, by starting long-running threads that loop over a shared source of 'work tokens', (usually by polling a mod::Thread::Queue) and 're-using' existing threads to perform repetitive tasks.

    As best I can tell without having run your code, in common with Thread::Pool, you are discarding old threads and starting new ones for each repetition of a basic task. Like disposable nappies, this may be convenient, but it is wasteful of resources.

    In addition, there are immense problems with the implementation of the OPs code--by the OP and by cut&paste, by your "test code". These mostly revolve around--as correctly identified by moritz--the use of global, and package global variables which appear to allow the main thread to share the links scavenged, but in reality result in a bunch of thread-uniquely accessible lists, which no other thread can ever access, and which get discard when you throw away the originating thread.

    In essence, as coded, all of the work done to extract the lists of links in the threads is entirely wasted, because they are deposited into a CLONED, non-shared, thread-specific piece of storage, that gets discard as soon as the work has been done to build it.

    Sorry to be negative, but as it stands, your 400 line module could be better replaced with 15 lines of direct threads code. Better, because the result would be a program that worked, rather than just ran.

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      Yes, there are probably too many locks.

      When I wrote this module some time ago I had to struggle with deadlocks.
      So I finished with locking all variables always in the same order,
      also in mind it would be easier to insert new functions into the code.

      When the whole thing worked, I didn't mind to clean up..

      At the place you mentioned: Do you mean cond_wait would also unlock ${$self->{poolcount}} or that locking ${$self->{poolcount}} is simply unneccessary ?
      I didn't trie so much to have few locks, would you say that a lock is big penalty ?

      I used the hash $self everywhere because I had in mind to be able to have more than one threadpool in an script.

      You are however wrong about the creating of the threads,
      the module doesn't destroy any thread at all.
      Instead it creates up to maxthread threads and puts the work recieved with enqueue() onto a queue.
      The threads pull in T_thread() the work of the queue.

      I wrote this module, because I had some problems with growing memory of long running scripts, due to creating and destroying threads.

      The OP's code..
      The only thing I've changed has been the small delay (select undef,undef,undef,0.2;).
      Since the OP didn't say exactly what he tries to do and where all the data should go, I didn't had a closer look, instead I really did copy'n paste.

      I've just been curious whether I could replace Thread::Pool with my module,
      since this worked without problems I thought my implementation could be helpful.
      There's also the point that the feedback, especially yours, helps me improve my code.

      So, no, I don't feel you would be negative.
        my original code was a simple version of a spider I'm working on. It was badly written and turned out to be the wrong thing actually. So, I've been cobbling a new version together.
Re^2: Threads and LWP::UserAgent question
by gatito (Novice) on Jun 09, 2008 at 23:59 UTC
    Huh, your code works fine on my PC desktop, and doesn't work on my pc laptop. You should submit that to CPAN as a replacement for the current garbage.
      What do you mean with it doesn't work on your laptop ?
      Are there any error messages ?

Log In?

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://691029]
and all is quiet...

How do I use this? | Other CB clients
Other Users?
Others taking refuge in the Monastery: (7)
As of 2018-06-19 02:54 GMT
Find Nodes?
    Voting Booth?
    Should cpanminus be part of the standard Perl release?

    Results (111 votes). Check out past polls.