Beefy Boxes and Bandwidth Generously Provided by pair Networks
Just another Perl shrine
 
PerlMonks  

Data Between Threads

by UpTide (Novice)
on Aug 17, 2016 at 14:34 UTC ( [id://1169918]=perlquestion: print w/replies, xml ) Need Help??

UpTide has asked for the wisdom of the Perl Monks concerning the following question:

So I was reading about threads, and I was wondering if/how I can send output from one thread to another thread? For example I have three threads (A, B, and C). I want to send the results of A to both B and C. B will do work on what it got from A then send that to C. Finally, C will now start work.

Any suggestions? I looked at threads::shared, but not all the data needs to be accessible from each thread. (such as how A doesn't need B's output to C)

Replies are listed 'Best First'.
Re: Data Between Threads
by Corion (Patriarch) on Aug 17, 2016 at 14:39 UTC

    The approach that I found to be manageable is to use Thread::Queue and have threads talk to each other using that, especially if the communication only needs to go in one direction..

Re: Data Between Threads
by BrowserUk (Patriarch) on Aug 17, 2016 at 15:14 UTC

    Simple example to get you started:

    #! perl -slw use strict; use threads; use Thread::Queue; sub threadA { my $Qab = shift; for( 1 .. 1e6 ) { $Qab->enqueue( 1+int rand 1000 ); } $Qab->enqueue( undef ); } sub threadB { my( $Qab, $Qbc ) = @_; while( my $in = $Qab->dequeue() ) { $Qbc->enqueue( $in * 10 ); } $Qbc->enqueue( undef ); } my $Qab = new Thread::Queue; my $Qbc = new Thread::Queue; threads->new( \&threadA, $Qab )->detach; threads->new( \&threadB, $Qab, $Qbc )->detach; print while defined( $_ = $Qbc->dequeue ); print 'Done';
    The main thread is your thread C.

    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
    In the absence of evidence, opinion is indistinguishable from prejudice.

      On the threadA you are putting shift into $Qab. What is this doing? I am assuming it is to put the parameter into the sub's $Qab, but correct me if I am wrong.

      Also what role does $Qab->enqueue( undef ); provide outside of the while loop? Shouldn't the queue be undefined while it is empty? If that is the case what is the advantage of explicitly enqueuing undef?

      Thanks everyone for your valuable time!

        On the threadA you are putting shift into $Qab. What is this doing? I am assuming it is to put the parameter into the sub's $Qab, but correct me if I am wrong.

        Correct. A thread function is just a normal function that takes it arguments in the normal way.

        Also what role does $Qab->enqueue( undef );

        Once threadA has finished its loop; it queues undef before terminating. That undef causes the while loop in threadB to terminate; whence it then enqueues undef which cause the while loop in threadC to terminate.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
        In the absence of evidence, opinion is indistinguishable from prejudice.

        The shift function in a subroutine takes the first item from the parameter list @_, which in threadA's case is the Thread::Queue object $Qab. It was put there by the threads->new(...) method call. threadB has two parameters, so he used an alternate syntax for that. He could have written:

        sub threadB { my $Qab = shift; my $Qbc = shift; ...

        He puts the undef value in the queue as a way of indicating to whatever process uses the queue that it has reached the last value. When threadB's while loop gets this value, it will terminate. If this value was not there, the dequeue method would keep waiting for a value.

        But God demonstrates His own love toward us, in that while we were yet sinners, Christ died for us. Romans 5:8 (NASB)

Re: Data Between Threads
by hippo (Bishop) on Aug 17, 2016 at 14:45 UTC
    For example I have three threads (A, B, and C). I want to send the results of A to both B and C. B will do work on what it got from A then send that to C. Finally, C will now start work.

    That sounds like queues to me. Take a look at Thread::Queue for how to push items into a queue and have another thread read items from it.

Re: Data Between Threads
by Anonymous Monk on Aug 24, 2016 at 22:47 UTC

    UpTide, welcome to the monastery.

    The following is a modification by BrowserUk's demonstration. Task A, B, C are spawned with 1, 2, 3 workers respectively. If threads is desired, load threads before MCE. By default, MCE spawns child processes on UNIX and Cygwin.

    #!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; sub task_a { my ($Qb, $Qc) = @_; for (1 .. 1e5) { $Qb->enqueue(1e3 + int(rand 1000)); $Qc->enqueue(2e3 + int(rand 1000)); } $Qb->enqueue(undef) for 1 .. 2; } sub task_b { my ($Qb, $Qc) = @_; while ( my $arg1 = $Qb->dequeue() ) { $Qc->enqueue($arg1 * 10); } $Qc->enqueue(undef) for 1 .. 3; } sub task_c { my ($Qc) = @_; while ( my $arg1 = $Qc->dequeue() ) { print "$arg1\n"; } } my $Qb = new Thread::Queue; my $Qc = new Thread::Queue; my @thrs; push @thrs, threads->new( \&task_a, $Qb, $Qc ) for 1 .. 1; push @thrs, threads->new( \&task_b, $Qb, $Qc ) for 1 .. 2; push @thrs, threads->new( \&task_c, $Qc ) for 1 .. 3; $_->join for @thrs; print "Done\n";

    The following does the same thing using the MCE module. The await method prevents the queue from exceeding 200 items.

    #!/usr/bin/perl use strict; use warnings; use MCE::Step; sub task_a { for (1 .. 1e5) { MCE->await('B', 200) if $_ % 200 == 0; MCE->enq('B', 1e3 + int(rand 1000)); MCE->await('C', 200) if $_ % 200 == 0; MCE->enq('C', 2e3 + int(rand 1000)); } } sub task_b { my ($mce, $arg1, $arg2, $argN) = @_; MCE->enq('C', $arg1 * 10); } sub task_c { my ($mce, $arg1, $arg2, $argN) = @_; print "$arg1\n"; } MCE::Step->init( task_name => [ 'A', 'B', 'C' ], max_workers => [ 1, 2, 3 ], ); mce_step \&task_a, \&task_b, \&task_c; print "Done\n";

      The ->enq method can send an array or hash reference as well. Serialization is handled automatically behind the scene.

      MCE->enq( 'B', [ 1e3 + int(rand 1000) ] ); MCE->enq( 'B', { arg1 => 1e3 + int(rand 1000) } );

      Warm regards, Mario.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://1169918]
Front-paged by Arunbear
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others chanting in the Monastery: (9)
As of 2024-04-18 11:23 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found