Beefy Boxes and Bandwidth Generously Provided by pair Networks
There's more than one way to do things
 
PerlMonks  

comment on

( [id://3333]=superdoc: print w/replies, xml ) Need Help??

Ok so i have written a smaller version the pretty much captures what happens in my real program, and I have done it using a shared hash as you recommended. The program simulates how we have one threadpool for client connections and running work in directly in them, and another thread pool for background work. I think it works well, but am curious if I am doing anything improperly, or if anything could be improved upon.

I did have to add this at line 205, and am not sure why. I dont know if there is something wrong in the code or not, but it doesnt seem like it should be necessary.

#next if(!defined($node));

Anyways, here is my code.

#! perl -slw package Main; use strict; use threads; use threads::shared; use Thread::Queue; our %nodes :shared; my $DEBUG = 0; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); my $prefix = "Thread " . $tid . " "; $prefix = "MAIN " if $tid == 0; $prefix = "QUEUE " if $tid == 1; lock $semStdout; print $prefix . $str; } sub debug{ my $str = shift; tprint($str) if $DEBUG; } our $NUMNODES = 100; sub worker1 { my $tid = threads->tid; my( $jobQueue, $Qwork, $Qresults ) = @_; for my $num (1..$NUMNODES) { my $node = Node->new("Node-$tid-$num"); $jobQueue->enqueueJob($node); # In my program some commands are run in the same thread # and others are pushed to a background worker thread, # this is just meant to simulate that if(int(rand(2))){ ## Process $work to produce $result ## my $result = $node->getNodeString(); $Qresults->enqueue( $result ); }else{ $Qwork->enqueue($node); } } $Qwork->enqueue( (undef) ); #Signal other pool we are done sendin +g nodes $Qresults->enqueue( undef ); ## Signal this thread is finished } sub worker2 { my $tid = threads->tid; my( $Qwork, $Qresults ) = @_; while( my $node = $Qwork->dequeue ) { my $result; ## Process $work to produce $result ## $result = "From secondary: " . $node->getNodeString(); $Qresults->enqueue( $result ); } $Qresults->enqueue( undef ); ## Signal this thread is finished } our $THREADS = 50; my $Qwork = new Thread::Queue; my $Qresults = new Thread::Queue; my $jobQueue = new Queue; #start the main jobQueue thread $jobQueue->startThread(); ## Create the pool of workers my @pool1 = map{ threads->create( \&worker1, $jobQueue, $Qwork, $Qresults ) } 1 .. $THREADS; ## Create the secondary pool of workers my @pool2 = map{ threads->create( \&worker2, $Qwork, $Qresults ) } 1 .. $THREADS; ## Process the results as they become available ## until all the workers say they are finished. my $numResults = 0; for ( 1 .. $THREADS * 2) { while( my $result = $Qresults->dequeue ) { ## Do something with the result ## tprint "RESULT= " . $result; $numResults++; } } print "Joining threads\n"; ## Clean up the threads $_->join for @pool1; $_->join for @pool2; $jobQueue->killThread()->join; if($numResults != $THREADS*$NUMNODES){ print "Only $numResults/" . $THREADS*$NUMNODES . " nodes were ret +urned"; }else{ print "All nodes returned successfully"; } ###################################################################### +########### package Queue; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; my $singleton; my $queue; my $jobNo; sub new { my $class = shift; $singleton = { thread => undef, threadDone => 0, }; share($singleton->{threadDone}); $queue = Thread::Queue->new(); $jobNo = 0; bless $singleton, $class; return $singleton; } sub getQueue{ return $singleton; } sub enqueueJob{ my $self = shift; my $node = shift; my $tid = threads->tid(); $queue->enqueue($node); my $results; 1 until do{ lock %Main::nodes; $results = $Main::nodes{ $tid } }; foreach my $key(keys(%$results)){ $node->{$key} = $results->{$key}; } delete( $Main::nodes{ $tid }); } sub startThread{ my $self = shift; $self->{thread} = threads->create(\&manageQueueHelper); } sub manageQueueHelper{ $singleton->manageQueue(); } sub killThread{ my $self = shift; { lock $self->{threadDone}; $self->{threadDone} = 1; } Main::debug("Killing Queue"); return $self->{thread}; } sub manageQueue{ my $self = shift; Main::debug("Queue started"); while(1){ my $done; { lock $self->{threadDone}; $done = $self->{threadDone}; } my $amtInQ = $queue->pending(); last if $done && $amtInQ == 0; Main::debug("$amtInQ threads in queue, done=" . $done); #slow it down, not useful for testing though #sleep 1; if($amtInQ == 0){ next; }else{ for(my $i = 0; $i < $amtInQ; $i++){ my $node = $queue->peek($i); #Why is this ever undefined? next if(!defined($node)); Main::debug($node->{name} . " can be set=" . $node->{c +anBeSet}); if(int($node->{canBeSet}) == 0){ # Just to simulate how in our program just because + its a node's turn # in the queue, doesnt mean that its really ready +to be used # Just a fake way to do it. It will go next iterat +ion. Main::debug($node->{name} . " not ready to be set" +); $node->{canBeSet} = 1; }else{ my $random = rand(10000); Main::debug("Setting $node->{name} to jobnumber $j +obNo with random string $random"); my %result :shared= ( jobNo => $jobNo, random => $random, ); { lock %Main::nodes; ${Main::nodes}{$node->{tid}} = \%result; } $queue->extract($i); $jobNo++; } } } } } 1; ###################################################################### +########### # Node Object package Node; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; sub new{ my $class = shift; my $name = shift; my $self = { tid => threads->tid(), name => $name, canBeSet => int(rand(2)), }; Main::debug("Created node $name "); bless ($self, $class); return $self; } sub getNodeString{ my $self = shift; return "Node: $self->{name}, JobNo=$self->{jobNo}, random=$self->{ +random}"; } 1;

In reply to Re^4: Thread terminating abnormally COND_SIGNAL(6) by rmahin
in thread Thread terminating abnormally COND_SIGNAL(6) by rmahin

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post; it's "PerlMonks-approved HTML":



  • Are you posting in the right place? Check out Where do I post X? to know for sure.
  • Posts may use any of the Perl Monks Approved HTML tags. Currently these include the following:
    <code> <a> <b> <big> <blockquote> <br /> <dd> <dl> <dt> <em> <font> <h1> <h2> <h3> <h4> <h5> <h6> <hr /> <i> <li> <nbsp> <ol> <p> <small> <strike> <strong> <sub> <sup> <table> <td> <th> <tr> <tt> <u> <ul>
  • Snippets of code should be wrapped in <code> tags not <pre> tags. In fact, <pre> tags should generally be avoided. If they must be used, extreme care should be taken to ensure that their contents do not have long lines (<70 chars), in order to prevent horizontal scrolling (and possible janitor intervention).
  • Want more info? How to link or How to display code and escape characters are good places to start.
Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others having an uproarious good time at the Monastery: (3)
As of 2024-04-24 22:15 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found