http://www.perlmonks.org?node_id=1044675


in reply to Re^3: Thread terminating abnormally COND_SIGNAL(6)
in thread Thread terminating abnormally COND_SIGNAL(6)

Ok so i have written a smaller version the pretty much captures what happens in my real program, and I have done it using a shared hash as you recommended. The program simulates how we have one threadpool for client connections and running work in directly in them, and another thread pool for background work. I think it works well, but am curious if I am doing anything improperly, or if anything could be improved upon.

I did have to add this at line 205, and am not sure why. I dont know if there is something wrong in the code or not, but it doesnt seem like it should be necessary.

#next if(!defined($node));

Anyways, here is my code.

#! perl -slw package Main; use strict; use threads; use threads::shared; use Thread::Queue; our %nodes :shared; my $DEBUG = 0; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); my $prefix = "Thread " . $tid . " "; $prefix = "MAIN " if $tid == 0; $prefix = "QUEUE " if $tid == 1; lock $semStdout; print $prefix . $str; } sub debug{ my $str = shift; tprint($str) if $DEBUG; } our $NUMNODES = 100; sub worker1 { my $tid = threads->tid; my( $jobQueue, $Qwork, $Qresults ) = @_; for my $num (1..$NUMNODES) { my $node = Node->new("Node-$tid-$num"); $jobQueue->enqueueJob($node); # In my program some commands are run in the same thread # and others are pushed to a background worker thread, # this is just meant to simulate that if(int(rand(2))){ ## Process $work to produce $result ## my $result = $node->getNodeString(); $Qresults->enqueue( $result ); }else{ $Qwork->enqueue($node); } } $Qwork->enqueue( (undef) ); #Signal other pool we are done sendin +g nodes $Qresults->enqueue( undef ); ## Signal this thread is finished } sub worker2 { my $tid = threads->tid; my( $Qwork, $Qresults ) = @_; while( my $node = $Qwork->dequeue ) { my $result; ## Process $work to produce $result ## $result = "From secondary: " . $node->getNodeString(); $Qresults->enqueue( $result ); } $Qresults->enqueue( undef ); ## Signal this thread is finished } our $THREADS = 50; my $Qwork = new Thread::Queue; my $Qresults = new Thread::Queue; my $jobQueue = new Queue; #start the main jobQueue thread $jobQueue->startThread(); ## Create the pool of workers my @pool1 = map{ threads->create( \&worker1, $jobQueue, $Qwork, $Qresults ) } 1 .. $THREADS; ## Create the secondary pool of workers my @pool2 = map{ threads->create( \&worker2, $Qwork, $Qresults ) } 1 .. $THREADS; ## Process the results as they become available ## until all the workers say they are finished. my $numResults = 0; for ( 1 .. $THREADS * 2) { while( my $result = $Qresults->dequeue ) { ## Do something with the result ## tprint "RESULT= " . $result; $numResults++; } } print "Joining threads\n"; ## Clean up the threads $_->join for @pool1; $_->join for @pool2; $jobQueue->killThread()->join; if($numResults != $THREADS*$NUMNODES){ print "Only $numResults/" . $THREADS*$NUMNODES . " nodes were ret +urned"; }else{ print "All nodes returned successfully"; } ###################################################################### +########### package Queue; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; my $singleton; my $queue; my $jobNo; sub new { my $class = shift; $singleton = { thread => undef, threadDone => 0, }; share($singleton->{threadDone}); $queue = Thread::Queue->new(); $jobNo = 0; bless $singleton, $class; return $singleton; } sub getQueue{ return $singleton; } sub enqueueJob{ my $self = shift; my $node = shift; my $tid = threads->tid(); $queue->enqueue($node); my $results; 1 until do{ lock %Main::nodes; $results = $Main::nodes{ $tid } }; foreach my $key(keys(%$results)){ $node->{$key} = $results->{$key}; } delete( $Main::nodes{ $tid }); } sub startThread{ my $self = shift; $self->{thread} = threads->create(\&manageQueueHelper); } sub manageQueueHelper{ $singleton->manageQueue(); } sub killThread{ my $self = shift; { lock $self->{threadDone}; $self->{threadDone} = 1; } Main::debug("Killing Queue"); return $self->{thread}; } sub manageQueue{ my $self = shift; Main::debug("Queue started"); while(1){ my $done; { lock $self->{threadDone}; $done = $self->{threadDone}; } my $amtInQ = $queue->pending(); last if $done && $amtInQ == 0; Main::debug("$amtInQ threads in queue, done=" . $done); #slow it down, not useful for testing though #sleep 1; if($amtInQ == 0){ next; }else{ for(my $i = 0; $i < $amtInQ; $i++){ my $node = $queue->peek($i); #Why is this ever undefined? next if(!defined($node)); Main::debug($node->{name} . " can be set=" . $node->{c +anBeSet}); if(int($node->{canBeSet}) == 0){ # Just to simulate how in our program just because + its a node's turn # in the queue, doesnt mean that its really ready +to be used # Just a fake way to do it. It will go next iterat +ion. Main::debug($node->{name} . " not ready to be set" +); $node->{canBeSet} = 1; }else{ my $random = rand(10000); Main::debug("Setting $node->{name} to jobnumber $j +obNo with random string $random"); my %result :shared= ( jobNo => $jobNo, random => $random, ); { lock %Main::nodes; ${Main::nodes}{$node->{tid}} = \%result; } $queue->extract($i); $jobNo++; } } } } } 1; ###################################################################### +########### # Node Object package Node; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; sub new{ my $class = shift; my $name = shift; my $self = { tid => threads->tid(), name => $name, canBeSet => int(rand(2)), }; Main::debug("Created node $name "); bless ($self, $class); return $self; } sub getNodeString{ my $self = shift; return "Node: $self->{name}, JobNo=$self->{jobNo}, random=$self->{ +random}"; } 1;

Replies are listed 'Best First'.
Re^5: Thread terminating abnormally COND_SIGNAL(6)
by BrowserUk (Patriarch) on Jul 17, 2013 at 01:53 UTC
    I did have to add this at line 205, and am not sure why. I dont know if there is something wrong in the code or not, but it doesnt seem like it should be necessary. #next if(!defined($node));

    It'll take me a while to digest the entirity of your code, but I think I can answer this one straight away.

    The short answer is that you've fallen into the trap of using the so-called "Advanced Methods" of Thread::Queue that were added by that module's newest owner.

    IMO these methods: peek(), insert() & extract() should never have been added to a Queue module as they break all the basic invariants (and thus expectations) of Queues. They effectively turn the Queue into an (shared)Array. Which is a nonsense because the underlying data structure is an array, and all this does is make it a very expensive to maintain array.

    And your (perfectly understandable given the modules provision of these methods) usage of the module as an array is exactly what is giving you the problem.

    Simplified, you are

    1. Querying the size of the array: (my $amtInQ = $queue->pending();).
    2. Then looping over the array by index: (for(my $i = 0; $i < $amtInQ; $i++){).
    3. Then peeking at array(i): (my $node = $queue->peek($i);).
    4. Then either:
      1. Doing nothing with it: (if(int($node->{canBeSet}) == 0){).
      2. Or: doing something with it then removing it from the array: ($queue->extract($i);).
    5. then looping back to process the next index.

    But ... by extracting (splicing) an element from the array, there are now less items in it, than there were when you queried it (my $amtInQ = $queue->pending();), and so when you get towards the end of your loop, there are no ith items left to peek(). It is the very fact that you are using array semantics on the queue that creates the problem.

    This is how I would code that same loop:

    while( my $node = $Qjob->dequeue ) { if( $node->{canBeSet} ) { ## deal with this node } else { $node->{canBeSet} = 1; $Q->enqueue( $node ); ## push back for next time. } }

    Queue semantics and no busy loops, nor any need to sleep to avoid burning cpu.

    Now, one objection you might have to that is your ThreadDone check and processing:

    But, if you used my queue processing loop above, your KillThread() method simply becomes:

    sub killThread{ $queue->enqueue( undef ); }

    When the undef is dequeued, the while loop ends and the thread self-terminates.

    There's a slight wrinkle with that. If the undef has been queued and then you encounter a node with CanBeSet = 0; then my code would requeue that node after the undef and the loop will terminate before it gets reprocessed, which you seem to explicitly not want to do. So, I would then recast my version of the loop like this:

    while( 1 ) { my $node = $queue->dequeue; if( !defined $node and $queue->pending ){ $queue->enqueue( undef ); next; } else { last; } if( $node->{canBeSet} ) { ## deal with this node } else { $node->{canBeSet} = 1; $Q->enqueue( $node ); ## push back for next time. } }

    Again, it still retains the Queue semantics avoiding the busy loop; but ensures the queue gets cleared before terminating.

    However ... I suspect that your entire design can be significantly further simplified but I'll need to think on that further and I'll save it for another post.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
      But ... by extracting (splicing) an element from the array, there are now less items in it, than there were when you queried it (my $amtInQ = $queue->pending();), and so when you get towards the end of your loop, there are no ith items left to peek(). It is the very fact that you are using array semantics on the queue that creates the problem.

      Ah cant believe i missed that one. So obvious, thank you.

Re^5: Thread terminating abnormally COND_SIGNAL(6)
by BrowserUk (Patriarch) on Jul 17, 2013 at 02:27 UTC

    I suspect this has something to do with your simplifications, but for the life of me I cannot see what the purpose of the JobQueue is?

    1. Jobs/Nodes get queued onto it by the first set of threads.
    2. Another thread monitors it, pulls them off, sets a random value into them and then sets them into a shared hash in the main thread;
    3. But then you do nothing with that?

    You seem to be getting the "results" in the main thread via the results queue; so what is the jobs queue/nodes hash for?


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Ok I'll try to explain a bit further, and i apologize for the confusion, the random variable was just to test having multiple values being put into the "return value" hash. In this demo, it does indeed do absolutely nothing.

      In my main program, clients connected, and threadPool1 handles their responses. They issue commands, and can either specify those commands to be run in their same thread so they can see the output in the interactive shell, or run in the background (and get pushed to threadPool2).

      So the jobQueue. When users issue commands, their command can not always be run right away depending on their criteria and what machine in our farm they want to run it on. The queue maintains the order commands were executed in by using the JobNodes which contain information from the command they entered. The jobQueue is not REALLY acting as a typical queue, but more as a utility to block the thread that issued the command until it is it's turn (using subroutines like enqueueJob() that block until a value is set). The first case it blocks is setting the job number. After the job queue returns the job number as shown in my demo, the client thread will continue. The jobQueue will pause the command again to tell it actually what resource it should be using.

      The purpose of the JobQueue is we want to preserve the order the commands are issued in regardless of if the database is updated before the queue gets back to a job that was deferred. For example

      1. User1 issues 2 commands for machine M1
      2. User2 issues 2 commands for M1, and M1 can only have 3 commands running
      3. User3 issues 2 commands for M1

      Our queue now has (user2, user3, user3)

      The current approach, using the Thread::Queue as an array, allows us to issue one DB query at the beginning of each iteration for the state of all our resources, so we can reference that instead of querying the database for every job in the queue every time we check it. So our process is:

      1. Query database for current state and build hash
      2. check nodes
      3. repeat steps above starting at beginning of array

      This allows us to preserve the order because: if we dequeue/enqueue user2's, and one of user1's finishes and updates our database, then when we dequeue user3's command, it will see that M1 has an available resource, and run that job. That is what we want to avoid. Using it as a proper queue would not preserve the order in that fringe case without some more tinkering.

        The purpose of the JobQueue is we want to preserve the order the commands are issued in ... This allows us to preserve the order because: if we dequeue/enqueue user2's, and one of user1's finishes and updates our database, then when we dequeue user3's command, it will see that M1 has an available resource, and run that job. That is what we want to avoid. Using it as a proper queue would not preserve the order in that fringe case without some more tinkering.

        Hm. But, queues DO preserve order. That's kind of their raison d'être. (And they also 'close up the gaps' automatically!)

        The problem -- I would suggest -- is that you are storing multiple requests as single items.

        If instead, (in your scenario above), you queued two items for each of your 3 users; you can now process that queue, queue-wise; re-queuing anything that isn't yet ready and discarding (moving elsewhere) anything that is complete and the queue will tale care of keeping things in their right order and ensuring that the 'wholes' get closed up, all without you having to mess around with indices trying to remember what's been removed and what not.

        Just a thought.


        That said; I'm still not clear on why you need the shared %nodes hash, when the results from the jobs are returned to the main thread via the $Qresults?


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.