use strict;
use warnings;
use threads;
use Thread::Queue 3.07;
use Sereal qw( decode_sereal encode_sereal );
use Time::HiRes qw( time );
use constant {
MAX_WORKERS => 3,
CHUNK_SIZE => 5,
};
# usage: script_que_thrs.pl [ N ]
my ( @wrks_in, @wrks_out );
my $que_in = Thread::Queue->new(); $que_in->limit = 50;
my $que_out = Thread::Queue->new();
my $start = time;
push @wrks_in, threads->create(\&task) for 1 .. MAX_WORKERS;
push @wrks_out, threads->create(\&output);
input( shift || 1000 );
$que_in->end; $_->join for @wrks_in;
$que_out->end; $_->join for @wrks_out;
printf "duration: %0.3f\n", time - $start;
sub input {
my $max_id = shift;
my $nxt_id = 0;
my @data;
while ( 1 ) {
last if $nxt_id >= $max_id;
push @data, ++$nxt_id;
if ( @data == CHUNK_SIZE ) {
$que_in->enqueue( encode_sereal(\@data) );
@data = ();
}
}
$que_in->enqueue( encode_sereal(\@data) ) if @data;
}
sub task {
while ( defined ( my $frozen = $que_in->dequeue ) ) {
my $array_ref = decode_sereal($frozen);
my $output = '';
for ( 0 .. $#{ $array_ref } ) {
$output .= $array_ref->[$_]."\n";
}
$que_out->enqueue($output);
}
}
sub output {
open my $fh_out, ">:utf8", "output.txt";
while ( defined ( my $output = $que_out->dequeue ) ) {
print {$fh_out} $output;
}
close $fh_out;
}
####
use strict;
use warnings;
use MCE::Hobo;
use MCE::Shared;
use Sereal qw( decode_sereal encode_sereal );
use Time::HiRes qw( time );
use constant {
MAX_WORKERS => 3,
CHUNK_SIZE => 5,
};
# usage: script_que_hobo.pl [ N ]
my ( @wrks_in, @wrks_out );
my $que_in = MCE::Shared->queue( await => 1 );
my $que_out = MCE::Shared->queue();
my $start = time;
push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS;
push @wrks_out, MCE::Hobo->create(\&output);
input( shift || 1000 );
$que_in->end; $_->join for @wrks_in;
$que_out->end; $_->join for @wrks_out;
printf "duration: %0.3f\n", time - $start;
sub input {
my $max_id = shift;
my $nxt_id = 0;
my @data;
while ( 1 ) {
last if $nxt_id >= $max_id;
push @data, ++$nxt_id;
if ( @data == CHUNK_SIZE ) {
$que_in->await(50); # limit
$que_in->enqueue( encode_sereal(\@data) );
@data = ();
}
}
$que_in->enqueue( encode_sereal(\@data) ) if @data;
}
sub task {
while ( defined ( my $frozen = $que_in->dequeue ) ) {
my $array_ref = decode_sereal($frozen);
my $output = '';
for ( 0 .. $#{ $array_ref } ) {
$output .= $array_ref->[$_]."\n";
}
$que_out->enqueue($output);
}
}
sub output {
open my $fh_out, ">:utf8", "output.txt";
while ( defined ( my $output = $que_out->dequeue ) ) {
print {$fh_out} $output;
}
close $fh_out;
}
##
##
use strict;
use warnings;
use MCE::Hobo;
use MCE::Mutex;
use MCE::Shared;
use Sereal qw( decode_sereal encode_sereal );
use Time::HiRes qw( time );
use constant {
MAX_WORKERS => 3,
CHUNK_SIZE => 5,
};
# usage: script_que_hobo2.pl [ N ]
my ( @wrks_in, @wrks_out );
my $que_in = MCE::Shared->queue( await => 1 );
my $mutex = MCE::Mutex->new();
my $start = time;
open my $fh_out, ">:utf8", "output.txt";
$fh_out->autoflush(1);
push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS;
input( shift || 1000 );
$que_in->end; $_->join for @wrks_in;
close $fh_out;
printf "duration: %0.3f\n", time - $start;
sub input {
my $max_id = shift;
my $nxt_id = 0;
my @data;
while ( 1 ) {
last if $nxt_id >= $max_id;
push @data, ++$nxt_id;
if ( @data == CHUNK_SIZE ) {
$que_in->await(50);
$que_in->enqueue( encode_sereal(\@data) );
@data = ();
}
}
$que_in->enqueue( encode_sereal(\@data) ) if @data;
}
sub task {
while ( defined ( my $frozen = $que_in->dequeue ) ) {
my $array_ref = decode_sereal($frozen);
my $output = '';
for ( 0 .. $#{ $array_ref } ) {
$output .= $array_ref->[$_]."\n";
}
$mutex->enter(sub {
print {$fh_out} $output;
});
}
}