UpTide, welcome to the monastery.
The following is a modification by BrowserUk's demonstration. Task A, B, C are spawned with 1, 2, 3 workers respectively. If threads is desired, load threads before MCE. By default, MCE spawns child processes on UNIX and Cygwin.
#!/usr/bin/perl
use strict;
use warnings;
use threads;
use Thread::Queue;
sub task_a {
my ($Qb, $Qc) = @_;
for (1 .. 1e5) {
$Qb->enqueue(1e3 + int(rand 1000));
$Qc->enqueue(2e3 + int(rand 1000));
}
$Qb->enqueue(undef) for 1 .. 2;
}
sub task_b {
my ($Qb, $Qc) = @_;
while ( my $arg1 = $Qb->dequeue() ) {
$Qc->enqueue($arg1 * 10);
}
$Qc->enqueue(undef) for 1 .. 3;
}
sub task_c {
my ($Qc) = @_;
while ( my $arg1 = $Qc->dequeue() ) {
print "$arg1\n";
}
}
my $Qb = new Thread::Queue;
my $Qc = new Thread::Queue;
my @thrs;
push @thrs, threads->new( \&task_a, $Qb, $Qc ) for 1 .. 1;
push @thrs, threads->new( \&task_b, $Qb, $Qc ) for 1 .. 2;
push @thrs, threads->new( \&task_c, $Qc ) for 1 .. 3;
$_->join for @thrs;
print "Done\n";
The following does the same thing using the MCE module. The await method prevents the queue from exceeding 200 items.
#!/usr/bin/perl
use strict;
use warnings;
use MCE::Step;
sub task_a {
for (1 .. 1e5) {
MCE->await('B', 200) if $_ % 200 == 0;
MCE->enq('B', 1e3 + int(rand 1000));
MCE->await('C', 200) if $_ % 200 == 0;
MCE->enq('C', 2e3 + int(rand 1000));
}
}
sub task_b {
my ($mce, $arg1, $arg2, $argN) = @_;
MCE->enq('C', $arg1 * 10);
}
sub task_c {
my ($mce, $arg1, $arg2, $argN) = @_;
print "$arg1\n";
}
MCE::Step->init(
task_name => [ 'A', 'B', 'C' ],
max_workers => [ 1, 2, 3 ],
);
mce_step \&task_a, \&task_b, \&task_c;
print "Done\n";
|