Beefy Boxes and Bandwidth Generously Provided by pair Networks
Keep It Simple, Stupid
 
PerlMonks  

Re: Perl Threads Question.

by NetWallah (Canon)
on Oct 04, 2010 at 16:22 UTC ( [id://863370]=note: print w/replies, xml ) Need Help??


in reply to Perl Threads Question.

Since I have seen threads related questions a few times in recent months, I offer the "Threads::Simple" module. Comments welcome.

Is this cpan-worthy ? (This would be my first, if so). Namespace suggestions also welcome.

package Thread::Simple; use 5.008; use strict; use threads; use warnings; use Carp; use Thread::Queue; our $VERSION = '0.02'; sub new{ my $class= shift; my $self = {@_}; # Take all parms passed in as hashref. # Expect WORKER=>&subref, N=>$int $self->{REQUEST_QUEUE} = new Thread::Queue; # Get work from here $self->{RESULT_QUEUE} = new Thread::Queue; # Worker Places resul +ts here $self->{STATS_QUEUE} = new Thread::Queue; # Worker Places threa +d Staistics here $self->{CLASS} = $class; $self->{STATE} = "RUNNING"; bless $self, $class; $self->{WORKER} and setworker($self); return $self; } sub setworker{ my ($self,$workref,@params) = @_; $self->{WORKER} = $workref if $workref; if ( @params ){ # Make it empty, if undef $self->{WORK_PARAMS} = \@params ; }else{ $self->{WORK_PARAMS} ||= []; } for (0..$self->{N} - 1 ){ $self->{THISTHREADNBR} = $_; $self->{CONTROL_QUEUE}[$_] = new Thread::Queue; # Send CONTROL +messages to worker push @{$self->{THREADPOOL}} ,threads->create(\&_internal_worke +r, $self) ; } } sub Pause_workers{ my ($self, $msg,$autoresume,$delay) = @_; die "Misuse of PAUSE - already Paused" if $self->{STATE} eq "PAUSE +D"; $_->enqueue($msg) for @{ $self->{CONTROL_QUEUE} }; $self->{REQUEST_QUEUE}->insert(0, (undef) x $self->{N} ); # Tell th +em to look at CTRL $self->{STATE} = "PAUSED"; sleep $delay if $delay; $self->Resume_workers() if $autoresume; } sub Resume_workers{ my $self = shift; die "Misuse of RESUME - we are NOT Paused" unless $self->{STATE} e +q "PAUSED"; $_->enqueue(undef) for @{ $self->{CONTROL_QUEUE} }; # Resumes work +ers $self->{STATE} = "RUNNING"; } sub _internal_worker{ my $self=shift; while ($_ = $self->getwork()){ $self->pushresult ( $self->{WORKER}->( $_, @{$self->{WOR +K_PARAMS}} ) ); } } sub feedwork{ my $self = shift; $self->{REQUEST_QUEUE}->enqueue (@_); } sub getwork{ my $self = shift; my $currentwork = $self->GetNextITEM("REQUEST_QUEUE"); if (defined $currentwork){ # There is work to be done .. $self->{WORKDONE}++; return $currentwork ; } # No current work - look for CONTROL msgs my $control_work = $self->{CONTROL_QUEUE}[$self->{THISTHREADNBR}] +->dequeue_nb; # Non Blk if (defined $control_work){ $self->{CONTROLMSGS}++; #Interpret/process the current message here #Then wait for next message, before proceeding $control_work = $self->{CONTROL_QUEUE}[$self->{THISTHREADNBR}] +->dequeue; # Blk; return $self->getwork(); # Recurse } # Main thread is telling us to quit # enqueued item MUST BE SIMPLE SCALAR (No aref etc) $self->{STATS_QUEUE}->enqueue ("$self->{THISTHREADNBR},$self->{WO +RKDONE},$self->{CONTROLMSGS}"); return undef;# Tell worker to quit } sub pushresult{ my $self = shift; $self->{RESULT_QUEUE}->enqueue (@_); } sub GetNextITEM{ my $self = shift; my $itemname = shift; # What queue to pull from... my $blocking = shift; defined $blocking or $blocking= 1; # Default is to BLOCK. Set to + 0 to NOT blk $blocking and return $self->{ $itemname }->dequeue; return $self->{ $itemname }->dequeue_nb; # NON-Blocking (und +ef at eof) } sub Quit_And_Get_Results{ my $self = shift; $self->feedwork( (undef) x $self->{N} ); # Tell them to quit $_->join for @{$self->{THREADPOOL}}; # Wait for kids return undef unless wantarray; my @results ; push @results, $_ while $_=$self->GetNextITEM("RESULT_QUEUE",0);# +Use NON-BLK return @results; } sub print_stats{ my $self = shift; my @results ; push @results, $_ while $_=$self->GetNextITEM("STATS_QUEUE",0);# +Use NON-BLK print "STATS: Thread $_\n" for sort @results; my $count = scalar @results; my (@freq,$totaltrans); for (@results){ my ($thread,$wrk,$ctrl)=split( /,/ , $_); $freq[ $wrk ]++ ; $totaltrans += $wrk; } my ($cumul, $cumul_th); for my $idx (0..$#freq) { next unless $freq[$idx]; $cumul_th += $freq[$idx]; $cumul += $freq[$idx] * $idx; printf "%3d = %3.1f%% Did %5d (%5d Cumul). %3d Threads did %3 +.1f %% of Trans.)\n", $freq[$idx] ,$freq[$idx]*100/$count , $idx ,$cumul, $cumul_th, $cumul *100/$totaltrans; } } 1; __END__ =head1 NAME Thread::Simple - A simple thread-pool implementation =head1 SYNOPSIS use Thread::Simple; =head1 DESCRIPTION C<Thread::Simple> provides a simple thread-pool implementaion without external dependencies outside core modules. Jobs can be submitted to and handled by multi-threaded `workers' managed by the pool. The processing model is that a series of individual units of data needs to be processed, in parallel. The result of processing (if prese +nt) can be returned, and will be collected. This module hides the "thread" and Thread::Queue" objects that are use +d in it's implementation. The caller/user sees only a highly simplified interface. =head1 SYNOPSIS use Thread::Simple; my $t = new Thread::Simple (N=>5, WORKER=> \&worksub); # 5 threads while( <get a unit of information for thread to work on> ) { $t->feedwork( $_ ); } # Get and process results for ( $t->Quit_And_Get_Results() ){ print "$_\n" ; # Or whatever you want to do with them } sub worksub{ # This is called by each thread my $work = shift; # Get one unit of work .. process the $work return "Some (optional) scalar result of processing $work"; } =head1 Selected METHODs new(N=><count>, WORKER=><subref>, <Other optional params>) Creates a Thread::Simple object. feedwork( $work_to_process) Adds to the queue of work to be done. Quit_And_Get_Results() Returns an array of results produced. The following methods are optional, for more control over processing: print_stats() can be called after Quit_And_Get_Results, to print out h +ow much work each thread did (for manual optimization of the N param). Pause_workers() and Resume_workers can temporarily suspend/resume work +. =head1 BUGS #Note: # If you see the error message: # Attempt to free unreferenced scalar: SV ..., Perl interpreter: . +.. during global destruction. # is due to a bug in threads. This bug is fixed in threads 1.63 =head1 AUTHOR NetWallah =head1 COPYRIGHT AND LICENSE Copyleft 2008 This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut

     Syntactic sugar causes cancer of the semicolon.        --Alan Perlis

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://863370]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others chanting in the Monastery: (4)
As of 2024-04-19 05:41 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found