Beefy Boxes and Bandwidth Generously Provided by pair Networks
There's more than one way to do things
 
PerlMonks  

Re^2: MCE: Slow IPC between child and gather process in parent

by marioroy (Priest)
on May 06, 2018 at 19:20 UTC ( #1214122=note: print w/replies, xml ) Need Help??


in reply to Re: MCE: Slow IPC between child and gather process in parent
in thread MCE: Slow IPC between child and gather process in parent

Hi again,

Discloser: I tried various things in the event the application is IPC-bound which may not be the case for the OP.

Chunking has the effect of reducing the number of operations for IPC. Thus, relieving pressure from the manager process. Please find the same demonstrations with chunking applied. Basically, chunking requires looping inside the iterator and user_func. In other words, chunking is a way to prevent the application from becoming IPC-bound.

Iterator & Gather demonstration

use strict; use warnings; use MCE; use MCE::Candy; use Time::HiRes 'time'; # usage: script_gather2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # gather with chunk_id must be called one time only MCE->gather($chunk_id, $output); }

Iterator & Relay demonstration

use strict; use warnings; use MCE; use Time::HiRes 'time'; # usage: script_relay2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); my $mce = MCE->new( init_relay => 0, max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # relay must be called one time only MCE::relay { print {$fh_out} $output }; }

Regards, Mario

Replies are listed 'Best First'.
Re^3: MCE: Slow IPC between child and gather process in parent
by marioroy (Priest) on May 07, 2018 at 02:19 UTC

    Hi again,

    Discloser: I also tried chunking with threads and MCE::Hobo using a queue for comparison with MCE (Iterator & Gather/Relay) demonstrations.

    Please find demonstrations using threads and MCE::Hobo for testing IPC overhead. On Unix platforms (particular BSD variants), the MCE solution may run up to 2x faster than threads. Increasing CHUNK_SIZE will decrease IPC overhead plus increase the memory footprint for the $output variable. Chunking/batching is helpful. Just be mindful of memory consumption in the output routine.

    Regarding the input queue, I limit to 50 items to prevent the worker from populating the queue out of control. Thread::Queue must be recent with limit capability or will fail.

    Threads & Thread::Queue->new() demonstration

    use strict; use warnings; use threads; use Thread::Queue 3.07; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_thrs.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = Thread::Queue->new(); $que_in->limit = 50; my $que_out = Thread::Queue->new(); my $start = time; push @wrks_in, threads->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, threads->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

    MCE::Hobo & MCE::Shared->queue() demonstration

    use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $que_out = MCE::Shared->queue(); my $start = time; push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, MCE::Hobo->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); # limit $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

    Update:

    Workers may write directly to the output handle, one at a time. This is possible with a mutex. Do not forget to enable autoflush on the file handle. Recent version of MCE and MCE::Shared load IO::Handle automatically in the event the autoflush method is not found.

    use strict; use warnings; use MCE::Hobo; use MCE::Mutex; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo2.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $mutex = MCE::Mutex->new(); my $start = time; open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; close $fh_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $mutex->enter(sub { print {$fh_out} $output; }); } }

    Regards, Mario

Re^3: MCE: Slow IPC between child and gather process in parent
by learnedbyerror (Monk) on May 07, 2018 at 12:59 UTC

    Please see my update 4 above. chunking definitely helped

    lbe

Re^3: MCE: Slow IPC between child and gather process in parent
by learnedbyerror (Monk) on May 07, 2018 at 13:06 UTC

    Please see my update 4 above. chunking definitely helped

    lbe

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://1214122]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others scrutinizing the Monastery: (2)
As of 2019-06-16 10:52 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Is there a future for codeless software?



    Results (76 votes). Check out past polls.

    Notices?