use strict; use warnings; use threads; use Thread::Queue 3.07; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_thrs.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = Thread::Queue->new(); $que_in->limit = 50; my $que_out = Thread::Queue->new(); my $start = time; push @wrks_in, threads->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, threads->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; } #### use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $que_out = MCE::Shared->queue(); my $start = time; push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, MCE::Hobo->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); # limit $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; } #### use strict; use warnings; use MCE::Hobo; use MCE::Mutex; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo2.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $mutex = MCE::Mutex->new(); my $start = time; open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; close $fh_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $mutex->enter(sub { print {$fh_out} $output; }); } }