Since I have seen threads related questions a few times in recent months, I offer the "Threads::Simple" module. Comments welcome.
Is this cpan-worthy ? (This would be my first, if so). Namespace suggestions also welcome.
package Thread::Simple;
use 5.008;
use strict;
use threads;
use warnings;
use Carp;
use Thread::Queue;
our $VERSION = '0.02';
sub new{
my $class= shift;
my $self = {@_}; # Take all parms passed in as hashref.
# Expect WORKER=>&subref, N=>$int
$self->{REQUEST_QUEUE} = new Thread::Queue; # Get work from here
$self->{RESULT_QUEUE} = new Thread::Queue; # Worker Places resul
+ts here
$self->{STATS_QUEUE} = new Thread::Queue; # Worker Places threa
+d Staistics here
$self->{CLASS} = $class;
$self->{STATE} = "RUNNING";
bless $self, $class;
$self->{WORKER} and setworker($self);
return $self;
}
sub setworker{
my ($self,$workref,@params) = @_;
$self->{WORKER} = $workref if $workref;
if ( @params ){ # Make it empty, if undef
$self->{WORK_PARAMS} = \@params ;
}else{
$self->{WORK_PARAMS} ||= [];
}
for (0..$self->{N} - 1 ){
$self->{THISTHREADNBR} = $_;
$self->{CONTROL_QUEUE}[$_] = new Thread::Queue; # Send CONTROL
+messages to worker
push @{$self->{THREADPOOL}} ,threads->create(\&_internal_worke
+r, $self) ;
}
}
sub Pause_workers{
my ($self, $msg,$autoresume,$delay) = @_;
die "Misuse of PAUSE - already Paused" if $self->{STATE} eq "PAUSE
+D";
$_->enqueue($msg) for @{ $self->{CONTROL_QUEUE} };
$self->{REQUEST_QUEUE}->insert(0, (undef) x $self->{N} ); # Tell th
+em to look at CTRL
$self->{STATE} = "PAUSED";
sleep $delay if $delay;
$self->Resume_workers() if $autoresume;
}
sub Resume_workers{
my $self = shift;
die "Misuse of RESUME - we are NOT Paused" unless $self->{STATE} e
+q "PAUSED";
$_->enqueue(undef) for @{ $self->{CONTROL_QUEUE} }; # Resumes work
+ers
$self->{STATE} = "RUNNING";
}
sub _internal_worker{
my $self=shift;
while ($_ = $self->getwork()){
$self->pushresult ( $self->{WORKER}->( $_, @{$self->{WOR
+K_PARAMS}} ) );
}
}
sub feedwork{
my $self = shift;
$self->{REQUEST_QUEUE}->enqueue (@_);
}
sub getwork{
my $self = shift;
my $currentwork = $self->GetNextITEM("REQUEST_QUEUE");
if (defined $currentwork){ # There is work to be done ..
$self->{WORKDONE}++;
return $currentwork ;
}
# No current work - look for CONTROL msgs
my $control_work = $self->{CONTROL_QUEUE}[$self->{THISTHREADNBR}]
+->dequeue_nb; # Non Blk
if (defined $control_work){
$self->{CONTROLMSGS}++;
#Interpret/process the current message here
#Then wait for next message, before proceeding
$control_work = $self->{CONTROL_QUEUE}[$self->{THISTHREADNBR}]
+->dequeue; # Blk;
return $self->getwork(); # Recurse
}
# Main thread is telling us to quit
# enqueued item MUST BE SIMPLE SCALAR (No aref etc)
$self->{STATS_QUEUE}->enqueue ("$self->{THISTHREADNBR},$self->{WO
+RKDONE},$self->{CONTROLMSGS}");
return undef;# Tell worker to quit
}
sub pushresult{
my $self = shift;
$self->{RESULT_QUEUE}->enqueue (@_);
}
sub GetNextITEM{
my $self = shift;
my $itemname = shift; # What queue to pull from...
my $blocking = shift;
defined $blocking or $blocking= 1; # Default is to BLOCK. Set to
+ 0 to NOT blk
$blocking and return $self->{ $itemname }->dequeue;
return $self->{ $itemname }->dequeue_nb; # NON-Blocking (und
+ef at eof)
}
sub Quit_And_Get_Results{
my $self = shift;
$self->feedwork( (undef) x $self->{N} ); # Tell them to quit
$_->join for @{$self->{THREADPOOL}}; # Wait for kids
return undef unless wantarray;
my @results ;
push @results, $_ while $_=$self->GetNextITEM("RESULT_QUEUE",0);#
+Use NON-BLK
return @results;
}
sub print_stats{
my $self = shift;
my @results ;
push @results, $_ while $_=$self->GetNextITEM("STATS_QUEUE",0);#
+Use NON-BLK
print "STATS: Thread $_\n" for sort @results;
my $count = scalar @results;
my (@freq,$totaltrans);
for (@results){
my ($thread,$wrk,$ctrl)=split( /,/ , $_);
$freq[ $wrk ]++ ;
$totaltrans += $wrk;
}
my ($cumul, $cumul_th);
for my $idx (0..$#freq) {
next unless $freq[$idx];
$cumul_th += $freq[$idx];
$cumul += $freq[$idx] * $idx;
printf "%3d = %3.1f%% Did %5d (%5d Cumul). %3d Threads did %3
+.1f %% of Trans.)\n",
$freq[$idx] ,$freq[$idx]*100/$count , $idx ,$cumul,
$cumul_th,
$cumul *100/$totaltrans;
}
}
1;
__END__
=head1 NAME
Thread::Simple - A simple thread-pool implementation
=head1 SYNOPSIS
use Thread::Simple;
=head1 DESCRIPTION
C<Thread::Simple> provides a simple thread-pool implementaion
without external dependencies outside core modules.
Jobs can be submitted to and handled by multi-threaded `workers'
managed by the pool.
The processing model is that a series of individual units of data
needs to be processed, in parallel. The result of processing (if prese
+nt) can be
returned, and will be collected.
This module hides the "thread" and Thread::Queue" objects that are use
+d
in it's implementation. The caller/user sees only a highly simplified
interface.
=head1 SYNOPSIS
use Thread::Simple;
my $t = new Thread::Simple (N=>5, WORKER=> \&worksub); # 5 threads
while( <get a unit of information for thread to work on> ) {
$t->feedwork( $_ );
}
# Get and process results
for ( $t->Quit_And_Get_Results() ){
print "$_\n" ; # Or whatever you want to do with them
}
sub worksub{ # This is called by each thread
my $work = shift; # Get one unit of work
.. process the $work
return "Some (optional) scalar result of processing $work";
}
=head1 Selected METHODs
new(N=><count>, WORKER=><subref>, <Other optional params>)
Creates a Thread::Simple object.
feedwork( $work_to_process)
Adds to the queue of work to be done.
Quit_And_Get_Results()
Returns an array of results produced.
The following methods are optional, for more control over processing:
print_stats() can be called after Quit_And_Get_Results, to print out h
+ow
much work each thread did (for manual optimization of the N param).
Pause_workers() and Resume_workers can temporarily suspend/resume work
+.
=head1 BUGS
#Note:
# If you see the error message:
# Attempt to free unreferenced scalar: SV ..., Perl interpreter: .
+.. during global destruction.
# is due to a bug in threads. This bug is fixed in threads 1.63
=head1 AUTHOR
NetWallah
=head1 COPYRIGHT AND LICENSE
Copyleft 2008
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut