Ok so i have written a smaller version the pretty much captures what happens in my real program, and I have done it using a shared hash as you recommended. The program simulates how we have one threadpool for client connections and running work in directly in them, and another thread pool for background work. I think it works well, but am curious if I am doing anything improperly, or if anything could be improved upon.
I did have to add this at line 205, and am not sure why. I dont know if there is something wrong in the code or not, but it doesnt seem like it should be necessary.
#! perl -slw
package Main;
use strict;
use threads;
use threads::shared;
use Thread::Queue;
our %nodes :shared;
my $DEBUG = 0;
my $semStdout :shared;
$|++;
sub tprint{
my $str = shift;
my $tid = threads->tid();
my $prefix = "Thread " . $tid . " ";
$prefix = "MAIN " if $tid == 0;
$prefix = "QUEUE " if $tid == 1;
lock $semStdout;
print $prefix . $str;
}
sub debug{
my $str = shift;
tprint($str) if $DEBUG;
}
our $NUMNODES = 100;
sub worker1 {
my $tid = threads->tid;
my( $jobQueue, $Qwork, $Qresults ) = @_;
for my $num (1..$NUMNODES) {
my $node = Node->new("Node-$tid-$num");
$jobQueue->enqueueJob($node);
# In my program some commands are run in the same thread
# and others are pushed to a background worker thread,
# this is just meant to simulate that
if(int(rand(2))){
## Process $work to produce $result ##
my $result = $node->getNodeString();
$Qresults->enqueue( $result );
}else{
$Qwork->enqueue($node);
}
}
$Qwork->enqueue( (undef) ); #Signal other pool we are done sendin
+g nodes
$Qresults->enqueue( undef ); ## Signal this thread is finished
}
sub worker2 {
my $tid = threads->tid;
my( $Qwork, $Qresults ) = @_;
while( my $node = $Qwork->dequeue ) {
my $result;
## Process $work to produce $result ##
$result = "From secondary: " . $node->getNodeString();
$Qresults->enqueue( $result );
}
$Qresults->enqueue( undef ); ## Signal this thread is finished
}
our $THREADS = 50;
my $Qwork = new Thread::Queue;
my $Qresults = new Thread::Queue;
my $jobQueue = new Queue;
#start the main jobQueue thread
$jobQueue->startThread();
## Create the pool of workers
my @pool1 = map{
threads->create( \&worker1, $jobQueue, $Qwork, $Qresults )
} 1 .. $THREADS;
## Create the secondary pool of workers
my @pool2 = map{
threads->create( \&worker2, $Qwork, $Qresults )
} 1 .. $THREADS;
## Process the results as they become available
## until all the workers say they are finished.
my $numResults = 0;
for ( 1 .. $THREADS * 2) {
while( my $result = $Qresults->dequeue ) {
## Do something with the result ##
tprint "RESULT= " . $result;
$numResults++;
}
}
print "Joining threads\n";
## Clean up the threads
$_->join for @pool1;
$_->join for @pool2;
$jobQueue->killThread()->join;
if($numResults != $THREADS*$NUMNODES){
print "Only $numResults/" . $THREADS*$NUMNODES . " nodes were ret
+urned";
}else{
print "All nodes returned successfully";
}
######################################################################
+###########
package Queue;
use strict;
use warnings;
use threads;
use threads::shared;
use Thread::Queue;
my $singleton;
my $queue;
my $jobNo;
sub new {
my $class = shift;
$singleton = {
thread => undef,
threadDone => 0,
};
share($singleton->{threadDone});
$queue = Thread::Queue->new();
$jobNo = 0;
bless $singleton, $class;
return $singleton;
}
sub getQueue{
return $singleton;
}
sub enqueueJob{
my $self = shift;
my $node = shift;
my $tid = threads->tid();
$queue->enqueue($node);
my $results;
1 until do{ lock %Main::nodes; $results = $Main::nodes{ $tid } };
foreach my $key(keys(%$results)){
$node->{$key} = $results->{$key};
}
delete( $Main::nodes{ $tid });
}
sub startThread{
my $self = shift;
$self->{thread} = threads->create(\&manageQueueHelper);
}
sub manageQueueHelper{
$singleton->manageQueue();
}
sub killThread{
my $self = shift;
{
lock $self->{threadDone};
$self->{threadDone} = 1;
}
Main::debug("Killing Queue");
return $self->{thread};
}
sub manageQueue{
my $self = shift;
Main::debug("Queue started");
while(1){
my $done;
{
lock $self->{threadDone};
$done = $self->{threadDone};
}
my $amtInQ = $queue->pending();
last if $done && $amtInQ == 0;
Main::debug("$amtInQ threads in queue, done=" . $done);
#slow it down, not useful for testing though
#sleep 1;
if($amtInQ == 0){
next;
}else{
for(my $i = 0; $i < $amtInQ; $i++){
my $node = $queue->peek($i);
#Why is this ever undefined?
next if(!defined($node));
Main::debug($node->{name} . " can be set=" . $node->{c
+anBeSet});
if(int($node->{canBeSet}) == 0){
# Just to simulate how in our program just because
+ its a node's turn
# in the queue, doesnt mean that its really ready
+to be used
# Just a fake way to do it. It will go next iterat
+ion.
Main::debug($node->{name} . " not ready to be set"
+);
$node->{canBeSet} = 1;
}else{
my $random = rand(10000);
Main::debug("Setting $node->{name} to jobnumber $j
+obNo with random string $random");
my %result :shared= (
jobNo => $jobNo,
random => $random,
);
{
lock %Main::nodes;
${Main::nodes}{$node->{tid}} = \%result;
}
$queue->extract($i);
$jobNo++;
}
}
}
}
}
1;
######################################################################
+###########
# Node Object
package Node;
use strict;
use warnings;
use threads;
use threads::shared;
use Thread::Queue;
sub new{
my $class = shift;
my $name = shift;
my $self = {
tid => threads->tid(),
name => $name,
canBeSet => int(rand(2)),
};
Main::debug("Created node $name ");
bless ($self, $class);
return $self;
}
sub getNodeString{
my $self = shift;
return "Node: $self->{name}, JobNo=$self->{jobNo}, random=$self->{
+random}";
}
1;