Beefy Boxes and Bandwidth Generously Provided by pair Networks
Your skill will accomplish
what the force of many cannot
 
PerlMonks  

Re: MCE: Slow IPC between child and gather process in parent

by marioroy (Priest)
on May 02, 2018 at 14:47 UTC ( #1213944=note: print w/replies, xml ) Need Help??


in reply to MCE: Slow IPC between child and gather process in parent

Hi learnedbyerror,

How is $output defined and populated? It's not shown in the code. Being on a Mac, is the folder where the output file resides disabled from Spotlight indexing? E.g. Apple Menu -> System Preferences -> Spotlight -> Privacy Tab

MCE Relay may be helpful to factor out freeze/thaw operations for large data. Please find below two demonstrations for solely testing IPC. The Gather and Relay demonstrations take 11.3 and 12.1 seconds respectively for 1 million iterations. Both handle above 50k iterations per second.

Iterator & Gather demonstration

use strict; use warnings; use MCE; use MCE::Candy; use Time::HiRes 'time'; # usage: script_gather.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 3, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; return sub { return if $nxt_id >= $max_id; return ++$nxt_id; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = $chunk_ref->[0]."\n"; MCE->gather($chunk_id, $output); }

Iterator & Relay demonstration

use strict; use warnings; use MCE; use Time::HiRes 'time'; # usage: script_relay.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); my $mce = MCE->new( init_relay => 0, max_workers => 3, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; return sub { return if $nxt_id >= $max_id; return ++$nxt_id; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = $chunk_ref->[0]."\n"; MCE::relay { print {$fh_out} $output }; }

Update: Changed max_workers from 'auto' to 3 above. Basically, try 3 ~ 5 workers as the OP's example involves lots of IO. The MCE module was written in a way to handle thousands of IPC requests per second. For such examples that do 10's of thousands IPC requests, limiting the number of workers allows the OS to allocate more CPU time for the manager process. Chunking is another way.

Regards, Mario

Replies are listed 'Best First'.
Re^2: MCE: Slow IPC between child and gather process in parent
by learnedbyerror (Monk) on May 02, 2018 at 16:41 UTC

    Please see my update above

    I am relieved that things are working up to my projected speed based upon tests. One of the reasons that I moved to MCE from writing my own fork/threads implementation or using other CPAN modules/frameworks is because of the relative speed of IPC with MCE. So I was pretty worried about what I saw. My only wish now is that I can find out what changed and caused a problem before.

    Marioroy Thank You for your contribution of MCE! While I agree with Discipulus, zentara, BrowserUK and other monks that developers should understand how to parallelize without the dependency of a framework, and I can and have, I appreciate when I don't have to think so much about this and develop tests to specifically validate that portion of code. As it happens, most of my parallelization needs have patterns that fit pretty cleanly into your framework. So there is minimal need for me to change my approach.

    And with your addition of MCE::Flow, I'm actually looking at rewriting one of my existing workflows that currently takes over 4 hours to run. I have hesitated changing it previously because of concerns about how much coordination code I would need to write to pipeline all of the pieces together. I believe that Flow will do much of that for me.

    Thanks again!

    lbe

Re^2: MCE: Slow IPC between child and gather process in parent
by marioroy (Priest) on May 06, 2018 at 19:20 UTC

    Hi again,

    Discloser: I tried various things in the event the application is IPC-bound which may not be the case for the OP.

    Chunking has the effect of reducing the number of operations for IPC. Thus, relieving pressure from the manager process. Please find the same demonstrations with chunking applied. Basically, chunking requires looping inside the iterator and user_func. In other words, chunking is a way to prevent the application from becoming IPC-bound.

    Iterator & Gather demonstration

    use strict; use warnings; use MCE; use MCE::Candy; use Time::HiRes 'time'; # usage: script_gather2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # gather with chunk_id must be called one time only MCE->gather($chunk_id, $output); }

    Iterator & Relay demonstration

    use strict; use warnings; use MCE; use Time::HiRes 'time'; # usage: script_relay2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); my $mce = MCE->new( init_relay => 0, max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # relay must be called one time only MCE::relay { print {$fh_out} $output }; }

    Regards, Mario

      Hi again,

      Discloser: I also tried chunking with threads and MCE::Hobo using a queue for comparison with MCE (Iterator & Gather/Relay) demonstrations.

      Please find demonstrations using threads and MCE::Hobo for testing IPC overhead. On Unix platforms (particular BSD variants), the MCE solution may run up to 2x faster than threads. Increasing CHUNK_SIZE will decrease IPC overhead plus increase the memory footprint for the $output variable. Chunking/batching is helpful. Just be mindful of memory consumption in the output routine.

      Regarding the input queue, I limit to 50 items to prevent the worker from populating the queue out of control. Thread::Queue must be recent with limit capability or will fail.

      Threads & Thread::Queue->new() demonstration

      use strict; use warnings; use threads; use Thread::Queue 3.07; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_thrs.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = Thread::Queue->new(); $que_in->limit = 50; my $que_out = Thread::Queue->new(); my $start = time; push @wrks_in, threads->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, threads->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

      MCE::Hobo & MCE::Shared->queue() demonstration

      use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $que_out = MCE::Shared->queue(); my $start = time; push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, MCE::Hobo->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); # limit $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

      Update:

      Workers may write directly to the output handle, one at a time. This is possible with a mutex. Do not forget to enable autoflush on the file handle. Recent version of MCE and MCE::Shared load IO::Handle automatically in the event the autoflush method is not found.

      use strict; use warnings; use MCE::Hobo; use MCE::Mutex; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo2.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $mutex = MCE::Mutex->new(); my $start = time; open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; close $fh_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $mutex->enter(sub { print {$fh_out} $output; }); } }

      Regards, Mario

      Please see my update 4 above. chunking definitely helped

      lbe

      Please see my update 4 above. chunking definitely helped

      lbe

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://1213944]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others contemplating the Monastery: (7)
As of 2019-03-22 16:33 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    How do you Carpe diem?





    Results (114 votes). Check out past polls.

    Notices?