#! perl -slw package Main; use strict; use threads; use threads::shared; use Thread::Queue; our %nodes :shared; my $DEBUG = 0; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); my $prefix = "Thread " . $tid . " "; $prefix = "MAIN " if $tid == 0; $prefix = "QUEUE " if $tid == 1; lock $semStdout; print $prefix . $str; } sub debug{ my $str = shift; tprint($str) if $DEBUG; } our $NUMNODES = 100; sub worker1 { my $tid = threads->tid; my( $jobQueue, $Qwork, $Qresults ) = @_; for my $num (1..$NUMNODES) { my $node = Node->new("Node-$tid-$num"); $jobQueue->enqueueJob($node); # In my program some commands are run in the same thread # and others are pushed to a background worker thread, # this is just meant to simulate that if(int(rand(2))){ ## Process $work to produce $result ## my $result = $node->getNodeString(); $Qresults->enqueue( $result ); }else{ $Qwork->enqueue($node); } } $Qwork->enqueue( (undef) ); #Signal other pool we are done sending nodes $Qresults->enqueue( undef ); ## Signal this thread is finished } sub worker2 { my $tid = threads->tid; my( $Qwork, $Qresults ) = @_; while( my $node = $Qwork->dequeue ) { my $result; ## Process $work to produce $result ## $result = "From secondary: " . $node->getNodeString(); $Qresults->enqueue( $result ); } $Qresults->enqueue( undef ); ## Signal this thread is finished } our $THREADS = 50; my $Qwork = new Thread::Queue; my $Qresults = new Thread::Queue; my $jobQueue = new Queue; #start the main jobQueue thread $jobQueue->startThread(); ## Create the pool of workers my @pool1 = map{ threads->create( \&worker1, $jobQueue, $Qwork, $Qresults ) } 1 .. $THREADS; ## Create the secondary pool of workers my @pool2 = map{ threads->create( \&worker2, $Qwork, $Qresults ) } 1 .. $THREADS; ## Process the results as they become available ## until all the workers say they are finished. my $numResults = 0; for ( 1 .. $THREADS * 2) { while( my $result = $Qresults->dequeue ) { ## Do something with the result ## tprint "RESULT= " . $result; $numResults++; } } print "Joining threads\n"; ## Clean up the threads $_->join for @pool1; $_->join for @pool2; $jobQueue->killThread()->join; if($numResults != $THREADS*$NUMNODES){ print "Only $numResults/" . $THREADS*$NUMNODES . " nodes were returned"; }else{ print "All nodes returned successfully"; } ################################################################################# package Queue; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; my $singleton; my $queue; my $jobNo; sub new { my $class = shift; $singleton = { thread => undef, threadDone => 0, }; share($singleton->{threadDone}); $queue = Thread::Queue->new(); $jobNo = 0; bless $singleton, $class; return $singleton; } sub getQueue{ return $singleton; } sub enqueueJob{ my $self = shift; my $node = shift; my $tid = threads->tid(); $queue->enqueue($node); my $results; 1 until do{ lock %Main::nodes; $results = $Main::nodes{ $tid } }; foreach my $key(keys(%$results)){ $node->{$key} = $results->{$key}; } delete( $Main::nodes{ $tid }); } sub startThread{ my $self = shift; $self->{thread} = threads->create(\&manageQueueHelper); } sub manageQueueHelper{ $singleton->manageQueue(); } sub killThread{ my $self = shift; { lock $self->{threadDone}; $self->{threadDone} = 1; } Main::debug("Killing Queue"); return $self->{thread}; } sub manageQueue{ my $self = shift; Main::debug("Queue started"); while(1){ my $done; { lock $self->{threadDone}; $done = $self->{threadDone}; } my $amtInQ = $queue->pending(); last if $done && $amtInQ == 0; Main::debug("$amtInQ threads in queue, done=" . $done); #slow it down, not useful for testing though #sleep 1; if($amtInQ == 0){ next; }else{ for(my $i = 0; $i < $amtInQ; $i++){ my $node = $queue->peek($i); #Why is this ever undefined? next if(!defined($node)); Main::debug($node->{name} . " can be set=" . $node->{canBeSet}); if(int($node->{canBeSet}) == 0){ # Just to simulate how in our program just because its a node's turn # in the queue, doesnt mean that its really ready to be used # Just a fake way to do it. It will go next iteration. Main::debug($node->{name} . " not ready to be set"); $node->{canBeSet} = 1; }else{ my $random = rand(10000); Main::debug("Setting $node->{name} to jobnumber $jobNo with random string $random"); my %result :shared= ( jobNo => $jobNo, random => $random, ); { lock %Main::nodes; ${Main::nodes}{$node->{tid}} = \%result; } $queue->extract($i); $jobNo++; } } } } } 1; ################################################################################# # Node Object package Node; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; sub new{ my $class = shift; my $name = shift; my $self = { tid => threads->tid(), name => $name, canBeSet => int(rand(2)), }; Main::debug("Created node $name "); bless ($self, $class); return $self; } sub getNodeString{ my $self = shift; return "Node: $self->{name}, JobNo=$self->{jobNo}, random=$self->{random}"; } 1;