Beefy Boxes and Bandwidth Generously Provided by pair Networks Cowboy Neal with Hat
Problems? Is your data what you think it is?
 
PerlMonks  

Re^10: memory leaks with threads

by misc (Pilgrim)
on Jul 10, 2007 at 13:34 UTC ( #625819=note: print w/ replies, xml ) Need Help??


in reply to Re^9: memory leaks with threads
in thread memory leaks with threads

I submitted a bugreport.

I did some benchmarking,
and calculated the time of running a function 300 times.

The results are, again, most suprising to me.

With 10 worker threads in a pool I get this:

micha@laptop ~/prog/perl/test $ time ./threads_benchmark.pl real 0m36.666s user 1m11.872s sys 0m0.172s (resident memory 6.3MB)
#!/usr/bin/perl -w use threads; use threads::shared; #use forks; #use forks::shared; #use Storable; share $T_maxthreads; $T_maxthreads = 10; share $T_maxpending; $T_maxpending = 20; share $T_threads; $T_threads = 0; share $T_freethreads; $T_freethreads = 0; share $T_workingthreads; $T_workingthreads = 0; share $T_threadstart; share @T_pool; share $T_poollock; share $T_poolcount; $T_poolcount = 0; share $T_morework; share %T_results; share $T_resultslock; share @T_freeresultids; share $T_freeresultidslock; share $T_nextresultid; $T_nextresultid = 1; share $T_shutdown; $T_shutdown = 0; ## Inititializes the threadpool ## args: (named) ## -maxthreads: maximum number of threads (default:10) ## -maxpending: How many jobs may be enqueued, if you try to enqeue mo +re jobs enqueue will block until a job has been done (default:20) ## -startthreads: Threads to start on startup (default:5) sub threadpool_init{ my %args = @_; if ( defined( $args{maxthreads} ) ){ $T_maxthreads = $args{maxthreads}; } if ( defined( $args{maxpending} ) ){ $T_maxpending = $args{maxpending}; } if (! defined( $args{startthreads} ) ){ $args{startthreads} = 5; } if ( $T_maxthreads < $args{startthreads} ){ $args{startthreads} = $T_maxthreads; } lock $T_threadstart; for ( 1.. $args{startthreads}){ my $t = threads->create("T_thread"); $t->detach(); } my $threads; do { # Wait until all threads have been started and are waitin +g for jobs cond_wait $T_threadstart; { lock $T_threads; $threads = $T_threads; } } while ( $threads < $args{startthreads} ); } ## Waits for all threads to finish their jobs and ends them ## sleeps for 1 second after doing his job, in the hope, that all thre +ads will have cleaned up. ## Is, however, just for the cosmetic of not beeing warned that thread +s were running while exiting the script. sub threadpool_shutdown{ threadpool_wait(); print "thr_waiting\n"; { lock $T_shutdown; $T_shutdown = 1; } my $t; do { { lock $T_morework; cond_broadcast $T_morework; } print "loop\n"; #{ lock $T_threads; $t = $T_threads; #} if ( $t > 0 ){ print "waiting, threads: $T_threads\n"; select undef,undef,undef,0.1; #cond_wait $T_threads; } } while ( $t > 0 ); select undef,undef,undef,0.25; } # A worker thread sub T_thread{ my $init = 1; my $tn; { lock $T_threads; $T_threads++; $tn = $T_threads; } while ( 1 ){ { lock $T_freethreads; $T_freethreads++; } my $job; { do { lock $T_morework; #$dolock = 1; { lock $T_poollock; $job = shift @T_pool; } if ( !defined( $job )){ if ( $init ){ lock $T_threadstart; cond_signal $T_threadstart; $init = 0; } #print "morework\n"; cond_wait $T_morework; { #print "lock shutdown\n"; lock $T_shutdown; if ( $T_shutdown ){ #print "shutting down. +\n"; lock $T_threads; $T_threads --; #cond_signal $T_thread +s; #print "thread exit\n" +; return; } } } } while ( !defined( $job ) ); lock $T_poolcount; $T_poolcount --; cond_signal $T_poolcount; } # Test if there are still enough freethreads.. { lock $T_freethreads; $T_freethreads--; #print "thread: freethreads $T_freethreads\n"; if ( $T_freethreads == 0 ){ # No threads left +for the work lock $T_maxthreads; lock $T_threads; if ( $T_maxthreads > $T_threads ){ my $thread = threads->create(" +T_thread"); $thread->detach(); } } } { lock $T_workingthreads; $T_workingthreads++; } my $result = &{$job->{function}}(@{$job->{args}},"\n", +$tn); if ( $job->{result} ){ #share $result; lock $T_resultslock; my $r = $T_results{$job->{resultid}}; lock $r; #my $r = $T_results{$job->{resultid}}; $r->{state} = 1; $r->{result} = $result; cond_signal $r; } { lock $T_workingthreads; $T_workingthreads--; cond_signal $T_workingthreads; } } } ## returns the result of the supplied resultid, ## waits until the result is existing. ## Removes the result from the stack, may be callen therefore only onc +e for every resultid ## returns undef if there is no such resultid. sub threadpool_waitforresult{ my $resultid = shift; my $r; { lock $T_resultslock; $r = $T_results{$resultid}; } if ( !defined($r)){ return; } my $result; { lock $r; while ( $r->{state} == 0 ){ # Wait for the result cond_wait $r; } $result = $r->{result}; lock $T_resultslock; delete $T_results{$resultid}; lock $T_freeresultidslock; push @T_freeresultids, $resultid; } return $result; } ## Enqueues a new job. ## args: ## The function name, which will be callen in the current context ## Args to be supplied to the function sub enqueue{ &T_enqueue( shift, 0, @_ ); } ## Enqueues a new job, and returns a resultid, which can be used to ge +t the result of the function via threadpool_waitforresult or threadpo +ol_getresult ## args: ## The function name, which will be callen in the current context ## Args to be supplied to the function sub renqueue{ return &T_enqueue( shift, 1, @_ ); } sub T_enqueue{ my $function = shift; my $result = shift; share my @args; @args = @_; my %hash; share %hash; share $function; $hash{function} = $function; $hash{args} = \@args; share $result; $hash{result} = $result; my $resultid = 0; if ( $result > 0 ){ # Want the result saved { lock $T_freeresultidslock; $resultid = shift @T_freeresultids; } if ( !defined( $resultid ) ){ { lock $T_nextresultid; $resultid = $T_nextresultid; $T_nextresultid ++; } } share $resultid; $hash{resultid} = $resultid; share my %h; $h{state} = 0; # No result yet ... { lock $T_resultslock; $T_results{$resultid} = \%h; } } #print "enqueued: ", @{$hash{args}},"\n","@args","\n"; lock $T_poolcount; { lock $T_morework; lock $T_poollock; push @T_pool, \%hash; cond_signal $T_morework; } $T_poolcount ++; if ( $T_poolcount > $T_maxpending ){ print "Waiting, poolcount: $T_poolcount\n"; cond_wait $T_poolcount; print "Waited, poolcount: $T_poolcount\n"; } return $resultid; } ## Returns the current number of working threads sub threadpool_threadsworking{ lock $T_workingthreads; return $T_workingthreads; } ## blocks until all jobs are done sub threadpool_wait{ while ( 1 ){ { lock $T_workingthreads; my $pending; { lock $T_poolcount; $pending = $T_poolcount; } if ( $T_workingthreads + $pending > 0 ){ cond_wait $T_workingthreads; } else { return; } } } } # TEST CODE ########################################################## +############################# sub function{ my @args = @_; my $a = $args[0]; my $b = 0; for ( 1..($a*1000) ){ $b++; $b = $b/10; $b++; $b *= 10; $b = $b / ($a+1); $b = $b /1.2334645897; $b = $b * 234.2312125; $b = $b/$a; $b++; $b = $b * $a; } return ( $a + 100 ); } print "Init.\n"; threadpool_init( startthreads=>10, maxthreads=>10, maxpending=>300 ); for my $a ( 1..300 ){ my $r = enqueue( "function", $a ); } #sleep 2; print "Waiting now, threads working: ",threadpool_threadsworking,"\n"; #threadpool_wait(); threadpool_shutdown(); print "Exiting now, threads working: ",threadpool_threadsworking,"\n";



And, creating and destroying 300 threads, 10concurrent threads
micha@laptop ~/prog/perl/test $ time ./threadpool.pl real 0m33.001s user 1m4.728s sys 0m0.024s (max res memory indefinite rising, 10MB)
#!/usr/bin/perl -w use threads; sub function{ my @args = @_; #print "function\n",@args,"\n"; #sleep int(rand(4)); #sleep 1; #select( undef,undef,undef,0.25 ); my $a = $args[0]; my $b = 0; for ( 1..($a*1000) ){ $b++; $b = $b/10; $b++; $b *= 10; $b = $b / ($a+1); $b = $b /1.2334645897; $b = $b * 234.2312125; $b = $b/$a; $b++; $b = $b * $a; } return ( $a + 100 ); } my @threads; for my $a ( 1..10 ){ push @threads, threads->create( "function", $a ); } for my $a ( 10..300 ){ push @threads, threads->create( "function", $a ); my $t = shift @threads; $t->join(); } $_->join for @threads;


Ok, my threadpool implementation is far from beeing perfect, but I had guessed the differences in speed would be more obvious.

Btw, I benchmarked also Thread::Pool

Thread::Pool
micha@laptop ~/prog/perl/test $ time ./thread_pool_benchmark.pl real 0m36.105s user 1m9.780s sys 0m0.048s 15M res memory
#!/usr/bin/perl -w use Thread::Pool; sub function{ my @args = @_; #print "function\n",@args,"\n"; #sleep int(rand(4)); #sleep 1; #select( undef,undef,undef,0.25 ); my $a = $args[0]; my $b = 0; for ( 1..($a*1000) ){ $b++; $b = $b/10; $b++; $b *= 10; $b = $b / ($a+1); $b = $b /1.2334645897; $b = $b * 234.2312125; $b = $b/$a; $b++; $b = $b * $a; } return ( $a + 100 ); } my $p = Thread::Pool->new( {optimize => 'cpu', # default: memo +ry do => \&function, # must have workers => 10} ); for my $a ( 1..300 ){ $p->job( $a ); } $p->shutdown();




My thread pool implementation (the first benchmark and code listed) seems to be stable and not too bad,
I believe there are already some advantages to Thread::Pool.

Is there a proposed way to publish such code, perhaps in Snippets ?

Update: Quoted the scripts in readmore tags, as suggested, I'm also going to publish my ongoing work in my scratchpad
Comments welcome


Comment on Re^10: memory leaks with threads
Select or Download Code
Re^11: memory leaks with threads
by BrowserUk (Pope) on Jul 10, 2007 at 13:57 UTC
    Is there a proposed way to publish such code, perhaps in Snippets ?

    Personally, I think adding your code (all 3 versions) to your post above in separate <readmore><code>...</code></readmore> blocks would be good. It'd put the benchmark figures into context, and make the code available for searching for those that follow.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://625819]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others browsing the Monastery: (4)
As of 2014-04-21 02:30 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    April first is:







    Results (489 votes), past polls