Beefy Boxes and Bandwidth Generously Provided by pair Networks
The stupid question is the question not asked
 
PerlMonks  

MCE: Slow IPC between child and gather process in parent

by learnedbyerror (Monk)
on May 02, 2018 at 05:26 UTC ( [id://1213918]=perlquestion: print w/replies, xml ) Need Help??

learnedbyerror has asked for the wisdom of the Perl Monks concerning the following question:

Oh Monks,

The village idiot returneth and seeks your wisdom yet again especially that of Monk Marioroy

I have a reasonably large dataset of html files ( +/- 950K files, average size 23K, total size 21G) which I need to parse, manipulate and save the processed output to a simple text file. Given the volume, I decided to take a parallelized approach to this. For the past several years, I have used MCE for tasks such as this. I was able to quickly get a working solution up and running that averages 380 seconds of clock time start to finish. This is plenty fast enough to meet my needs. The generated output file is 7.0 GB in size with approximately 25MM rows

The following is the extracted/condensed code of the meat of the process. In addition to the modules show below, I also have Sereal Encoder/Decoder installed. Installation of Sereal knocked 40 secs of the initial run time.

use File::Map qq(map_file); use MCE; use MCE::Candy; use Path::Iterator::Rule; my $rule = Path::Iterator::Rule->new->file->name(qr/[.](html)$/); my $iterator = $rule->iter_fast("topdir"); open my $fh_out, ">:utf8", $self->fn_out; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 'auto', user_func => \&parse_file, )->spawn; $mce->process($iterator); sub parse_file { my ( $mce, $chunk_ref, $chunk_id ) = @_; map_file my $text, $chunk_ref->[0], '<'; my (@posts) = $text =~ m/ \<\!--XXXX:\ (\d+)--\>\<\!--YYYY:\ (\d+).+? (?:<\!--AAAAA--\>(.*?)\<\!--\/AAAAA--\>|) \<\!--BBBB--\>(.*?)\<\!--\/BBBB--\>.+? \<\!--CCCC--\>(.+?)\<\!--\/CCCC--\> /msgx; … Do some stuff with posts and place results in multiline string $output $mce->gather( $chunk_id, $output ); }

But given a little bit of boredom, I decided to investigate to see just how efficient this is. My first look was at htop when the program is running. As expected, it kicked off 8 forked processes on my Mac with an i7 proc. However, I noticed that none of these were running at capacity. Usually they were hitting at 60 – 70% utilization per logical core. This led me to conclude that I probably have an I/O bottleneck somewhere

I ran the following tests

  1. Immediately following the map_file call, I added: my $t = $text; return; to allow me to see how quickly I could read all of the data. This finished consistently in 160 secs.
  2. Similarly, I placed a return; just before the gather call to let me see the read and processing total time. Surprisingly, this only added 5 seconds for a total of 165 secs. I gained even more respect for the perl Regexp engine and its efficiency of working with a memory mapped file.
  3. Wrote a simple program to generate 7GB of data with similar characteristics of the file referenced above and wrote this to disk. This ran consistently at 15 seconds

Doing the math ( 380 – 160 – 5 – 15 = 200 ) leaves me believing that the gap of 200 seconds is the time required to move the data from the child process back to the parent. This seems large to me. I based this on:

  1. I can read 21GB from disk, an SSD, in 160 secs.
  2. I can write 7GB to disk, same SSD, in 15 secs.
  3. Shouldn’t an in-memory transfer be closers to 15 seconds that 160?

I am in general familiar with the woes of IPC speed. I have other parallelized programs that I have written where I had to use something like a finely tuned BerkeleyDB implementation for the purpose of IPC. Most of those projects were much larger and had a much more complex analytical pattern often times requiring multiple programs to communicate with each other. But, I still wonder whether there is an issue with the IPC between the MCE child and parent processes.

So my question(s) are:

  1. Is this as good as it gets for MCE?
  2. Should I create an additional process with MCE::Hobo and use MCE::Queue to move the data from the child to the file writer?
  3. Am I missing something?

Thanks in advance for the help!

lbe

UPDATE: May 2, 2018 11:12 GMT-6

Now I really feel like the village idiot. I have been running this code and testing against various perl versions and none of them have finished in less than about 380 seconds ... until this morning. When I read anonymous monk's question regarding threads, I reran the test and it ran in 288 seconds (about 8 seconds above my theoretical number above. I thought wow, I may be onto something. When I went back and tested without threads, meaning using fork, it now ran in 278 seconds. Both of these test were on perl 5.26.2 with threads compiled. I reran the same test on 5.26.2 without threads compiled and got about the same numbers. Last night, my last run was on 2.27.11 (a dev release). I re-ran both with and without threads and got essentially the same time.

I have checked and compared my current program file with the a my last commit yesterday and other than the addition of use threads for the the threaded test, there is no difference. I have also validated that nothing has changed in the source and that the generated results match my previous results. There is some bad juju, or maybe I should say good juju with the performance improvement, going on somewhere. I'll continue to run more tests and will update the thread if I learn anything new

Thank You! to all of you that responded!

lbe

UPDATE2: May 3, 2018 15:21 GMT-6

I'm adding this update, really reply, here since it actually is in response to several of the threads below and thought it would read better instead of getting lost at too low of a level. Thanks to all of you for your additional input. I'm devising some tests to get some additional data to help profile duration and throughput. I will share my findings when I complete. It will likely take me a couple of days to complete given immediate workload.

In the mean time, I have bad news. When I went through the faster tests line by line, I found an error in my code. They actual run times are what I had first posted. All of my benchmarks on perl 5.26.2 run for 380 - 358 secs. There is still a delay of approximately 100 seconds that I can currently only explain with fear, uncertainty and doubt. marioroy I will follow up on your suggestions now that I know this

One clarification, the read processes are independent of each other. In the execution of the process_file function, one file is read, analytics computed and one multiline gather/print is use to persist the results. This program will never read the same file more than once. File selection order is pseudo-random in that a file is processed in the order returned from Path::Iterator::Rule. The read from the file is at the high level a single read - meaning either a file slurp into a string variable or execution of a regexp match against the map_file. I need to investigate block sizes on the SSD and how Apple APFS handles reads and calculate statistics on file size distribution to estimate how efficient or inefficient the SSD may be in what it has to read vs. what it transfers.

I have previously run Devel::NYTProf on the single threaded version and made some modifications to reduce the computation time. I also changed from open to map_file which resulted in a modest time reduction for reading. I do want to test the assertion that things could possibly be slower with map_file when running in parallel.

Unfortunately, I know of no package that can profile this while running at or nearly at speed. As such, I am going to have to instrument the code to record timings and volumes read/written in a way that minimizes impact on the execution profile - yes I know, Heisenberg Uncertainty Principle.

Lastly, I conducted some read benchmarking using fgrep which I believe most people will agree is pretty fast at reading data across multiple files in a single process. My methodology is:

  1. Create a list of directories 3 levels below the top of the directory structure of interest. This list contains 1,107 entries.
  2. Pipe this list into xargs using -P to control the number of forked processes running at a time. This will form more process than the MCE approach, but this should be negligible within the overall runtime
  3. call fgrep -r '.' level_3_directory. fgrep will recurse down through the tree and cat the file contents to STDOUT
  4. Pipe the output from all of the threads to pv to record the time and throughput

NOTE: The machine on which this running is have 16GB of RAM. The total bytes read in each run is 21.GB. So the file cache will be overrun in a single run eliminating any cache assistance from run to run. I ran made runs from 8 processors down to 1. I ran the 8 processor run twice and through the first one to make sure that there was no cache assistance

The command to accomplish this is:

find directory_name-depth -mindepth 3 -maxdepth 3 -print | xargs -L1 -P 8 fgrep -r '.' | pv >/dev/null

Where 8 is the number of processes run.

The output of this command looks like

18.6GiB 0:01:43 [ 184MiB/s]

The results are:

CPU CountTotal Throughput Rate MB/secPer Processor Throughput Rate (MB/sec)Per Processor efficiency relative to 1 CPU
131310%
26131100%
3852892%
41062786%
51172376%
61352376%
71492169%
81742271%

My initial interpretation of these results is that my code is not IO bound. With 8 fgrep processes running, the total time to read the files is 103 secs. Whereas my best read time is 160 secs. when I use 8 processes. I will experiment to see if I can get this reduced any further.

I will update this post in another 2 or 3 days once I have additional information

Thanks!

lbe

UPDATE3: May 6, 2018 00:30 GMT-6

I have instrumented the code and surprisingly don’t see any measurable impact on overall run time. Maybe, Heisenberg doesn’t apply here :). The processor throughputs posted in Update2 are consistent with my new measurements. On this i7, my overall runtime decreases until I reach 8 worker threads. I increased from 9 – 12 workers and saw approximately the same run times as with 8 threads. This is consistent with my expectations since the i7 has 8 logical cores.

The overall run times are:

  • 382 secs – MCE, 8 workers, read with map_file
  • 1,822 secs – single process, read with map_file

* map_file is 15% faster than using a single line slurp-eaze read

The breakdown of the run time for the 382 secs above is:

  • 48% - Read
  • 13% - Calculations
  • 4% - Write
  • 20% - Path::Iterator::Rule
  • 15% - Overhead – MCE, context switches …

I ran the above tests with everything closed on the Mac and with it disconnected from the network to minimize any competition for CPU or I/O cycles. All processes ran in memory without swapping to disk. There is 16 GB of RAM in this machine, only 9 GB were in use while the program ran.

The files statistics are:

  • File Count = 949,670
  • Min File Size = 189 B
  • Average File Size = 22,178 B
  • Max File Size = 985,373 B
  • Std Dev of Files Size = 23,262 B
  • 80% of the files are < 30,000 B
  • 90% of the files are < 50,000 B

My interpretation of the data that I have gathered is:

  1. When running a single process, the limiting factor is the alternating read/write I/O
  2. When using MCE, overall run time reduces and aggregate I/O increases until the number of worker processes equals the number of virtual cores
  3. The impact of time required for IPC is less than 10% (38 secs) of the overall run time. This is much more than offset by the 79% (1,440) second reduction in overall run time.
  4. map_file is approximately 15 % faster than PerlIO for this directory structure and file size distribution
  5. Another approximate 12% reduction in run time can be saved by using the unix find command to prefetch the names of the files of interest instead of using Path::Iterator::Rule
  6. Overall, I am satisfied that this set of code is reasonably optimized as it exists.
  7. Further improvement in run time would require moving to something like BerkelyDB and possibly using Sereal with compression enabled to reduce disk I/O and eliminate nearly one million file opens and closes.

Thanks to all of you who asked questions and provided input. Most special thanks to marioroy for his response and for MCE!

lbe

UPDATE4: May 7, 2018 08:00 GMT-6

Hello marioroy,

I took you advice and created a chunking iterator and in short have significant improvement. I decided to deviate from PIR for now and cheat and create an iterator based on the Mac’s native find command. The iterator code is:

Use File::Which qw(which); sub get_dir_file_list_iter { my $dir = shift; my $FIND = which 'find'; my $CMD = qq/$FIND -L "$dir" -type f -print/; return ( sub { my $chunk_size = shift // 1; my @ary; while ( my $fn = <$FH> ) { chomp $fn; push( @ary, $fn ); last if @ary == $chunk_size; } return (@ary); } ); }

Let me try to cutoff some of the flames on calling an external to do something that could be done with pure perl. At this point, I am trying to optimize speed. In the vast majority of my perl development, I color inside of the lines; however, at times when performance is my main concern, I cheat and leverage executables outside of perl that are optimized for a specific role. find is one of those. I recognize that there are potential problems with unanticipated side effects such as zombie processes, race conditions … In this case, I have decided to accept these risks as this approach reduces iteration clock time in this app from ~60 seconds to ~20 seconds based upon instrumented timing. In general, I advocate using perlish tools like PIR, File::Find

My MCE code now looks like

use MCE; use MCE::Candy; my $iterator = get_dir_file_list_iter($dir); open my $fh_out, ">, $ fn_out; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), chunk_size => $iter_file_ct, max_workers => $max_workers, user_func => \&parse_files, )->spawn; $mce->process($iterator); $mce->shutdown() );

With respect to overall run time, with the find base iterator and a chunk_size of 250, I am down to ~300 secs. from my original ~380 secs. I have not done sufficient testing to validate what contributed to the specific reduction. I have created a shell script to run benchmark based upon a number of different variations and will update once it completes

marioroy, I had already been thinking about using MCE::Hobo and MCE::Queue to do something similar to your suggestions in Re^3: MCE: Slow IPC between child and gather process in parent. I will try this variation once the above testing completes

Thanks for you guidance and willingness to help!

lbe

UPDATE5: May 7, 2018 23:00 GMT-6

OK, ran some benchmarks today. My observation based upon them are:

  • 380 sec – My base time before engaging in this optimization exercise
  • 360 sec – 20 sec improvement when I change the read process from open to File::Map's map_file

  • 325 sec – 35 sec improvement when I moved from using PIR to using my custom iterator based upon the Mac’s find binary
  • 290 sec – 35 sec improvement when implementing chunking. For this machine, code and data, the sweet spot is around 500 file names per chunk. This results in each gather call transferring ~ 4MB of data as opposed to the 1 file chunk of ~ 8KB
  • 270 sec - the average clock time spent in each process processing files. The overhead for MCE IPC is down to 20 secs (290-270)
  • 210 sec - the average time each process spends reading the file into memory and applying the first regexp. This is about 40 secs slower than my map file speed test decribed above.
  • 74 MB/sec - the average read spead (total file size / elapsed time)

I think this may be about as good as I am going to get using this system unless I can find a way to read the data from the disk faster. The read throughput is less than half of what I was able to achieve with 8 processes runing recursive fgrep.

At this point, I am going to close my testing insofar as updating in this thread. I don't think I will get much more speed out of using the MCE::Hobo and MCE::Queue, though I will give it a try. I'll also perform some additional benchmarking on reading to see if other options like File::Slurper, sysopen/sysread ...

Thanks to all for your comments and advice and a special thanks to marioroy for piping in with guidance on MCE

lbe

Replies are listed 'Best First'.
Re: MCE: Slow IPC between child and gather process in parent
by marioroy (Prior) on May 02, 2018 at 14:47 UTC

    Hi learnedbyerror,

    How is $output defined and populated? It's not shown in the code. Being on a Mac, is the folder where the output file resides disabled from Spotlight indexing? E.g. Apple Menu -> System Preferences -> Spotlight -> Privacy Tab

    MCE Relay may be helpful to factor out freeze/thaw operations for large data. Please find below two demonstrations for solely testing IPC. The Gather and Relay demonstrations take 11.3 and 12.1 seconds respectively for 1 million iterations. Both handle above 50k iterations per second.

    Iterator & Gather demonstration

    use strict; use warnings; use MCE; use MCE::Candy; use Time::HiRes 'time'; # usage: script_gather.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 3, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; return sub { return if $nxt_id >= $max_id; return ++$nxt_id; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = $chunk_ref->[0]."\n"; MCE->gather($chunk_id, $output); }

    Iterator & Relay demonstration

    use strict; use warnings; use MCE; use Time::HiRes 'time'; # usage: script_relay.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); my $mce = MCE->new( init_relay => 0, max_workers => 3, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; return sub { return if $nxt_id >= $max_id; return ++$nxt_id; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = $chunk_ref->[0]."\n"; MCE::relay { print {$fh_out} $output }; }

    Update: Changed max_workers from 'auto' to 3 above. Basically, try 3 ~ 5 workers as the OP's example involves lots of IO. The MCE module was written in a way to handle thousands of IPC requests per second. For such examples that do 10's of thousands IPC requests, limiting the number of workers allows the OS to allocate more CPU time for the manager process. Chunking is another way.

    Regards, Mario

      Please see my update above

      I am relieved that things are working up to my projected speed based upon tests. One of the reasons that I moved to MCE from writing my own fork/threads implementation or using other CPAN modules/frameworks is because of the relative speed of IPC with MCE. So I was pretty worried about what I saw. My only wish now is that I can find out what changed and caused a problem before.

      Marioroy Thank You for your contribution of MCE! While I agree with Discipulus, zentara, BrowserUK and other monks that developers should understand how to parallelize without the dependency of a framework, and I can and have, I appreciate when I don't have to think so much about this and develop tests to specifically validate that portion of code. As it happens, most of my parallelization needs have patterns that fit pretty cleanly into your framework. So there is minimal need for me to change my approach.

      And with your addition of MCE::Flow, I'm actually looking at rewriting one of my existing workflows that currently takes over 4 hours to run. I have hesitated changing it previously because of concerns about how much coordination code I would need to write to pipeline all of the pieces together. I believe that Flow will do much of that for me.

      Thanks again!

      lbe

      Hi again,

      Discloser: I tried various things in the event the application is IPC-bound which may not be the case for the OP.

      Chunking has the effect of reducing the number of operations for IPC. Thus, relieving pressure from the manager process. Please find the same demonstrations with chunking applied. Basically, chunking requires looping inside the iterator and user_func. In other words, chunking is a way to prevent the application from becoming IPC-bound.

      Iterator & Gather demonstration

      use strict; use warnings; use MCE; use MCE::Candy; use Time::HiRes 'time'; # usage: script_gather2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; my $mce = MCE->new( gather => MCE::Candy::out_iter_fh($fh_out), max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # gather with chunk_id must be called one time only MCE->gather($chunk_id, $output); }

      Iterator & Relay demonstration

      use strict; use warnings; use MCE; use Time::HiRes 'time'; # usage: script_relay2.pl [ N ] open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); my $mce = MCE->new( init_relay => 0, max_workers => 3, chunk_size => 5, user_func => \&user_func, )->spawn; my $start = time; $mce->process( make_iterator( shift || 1000 ) ); my $duration = time - $start; $mce->shutdown; close $fh_out; printf "duration: %0.3f\n", $duration; sub make_iterator { my $max_id = shift; my $nxt_id = 0; # closure block receives chunk_size value return sub { my ( $chunk_size, @data ) = @_; return if $nxt_id >= $max_id; for ( 1 .. $chunk_size ) { push @data, ++$nxt_id; last if $nxt_id >= $max_id; } return @data; }; } sub user_func { my ( $mce, $chunk_ref, $chunk_id ) = @_; my $output = ''; # chunking requires looping inside user_func for ( 0 .. $#{ $chunk_ref } ) { $output .= $chunk_ref->[$_]."\n"; } # relay must be called one time only MCE::relay { print {$fh_out} $output }; }

      Regards, Mario

        Hi again,

        Discloser: I also tried chunking with threads and MCE::Hobo using a queue for comparison with MCE (Iterator & Gather/Relay) demonstrations.

        Please find demonstrations using threads and MCE::Hobo for testing IPC overhead. On Unix platforms (particular BSD variants), the MCE solution may run up to 2x faster than threads. Increasing CHUNK_SIZE will decrease IPC overhead plus increase the memory footprint for the $output variable. Chunking/batching is helpful. Just be mindful of memory consumption in the output routine.

        Regarding the input queue, I limit to 50 items to prevent the worker from populating the queue out of control. Thread::Queue must be recent with limit capability or will fail.

        Threads & Thread::Queue->new() demonstration

        use strict; use warnings; use threads; use Thread::Queue 3.07; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_thrs.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = Thread::Queue->new(); $que_in->limit = 50; my $que_out = Thread::Queue->new(); my $start = time; push @wrks_in, threads->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, threads->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

        MCE::Hobo & MCE::Shared->queue() demonstration

        use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $que_out = MCE::Shared->queue(); my $start = time; push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; push @wrks_out, MCE::Hobo->create(\&output); input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; $que_out->end; $_->join for @wrks_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); # limit $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $que_out->enqueue($output); } } sub output { open my $fh_out, ">:utf8", "output.txt"; while ( defined ( my $output = $que_out->dequeue ) ) { print {$fh_out} $output; } close $fh_out; }

        Update:

        Workers may write directly to the output handle, one at a time. This is possible with a mutex. Do not forget to enable autoflush on the file handle. Recent version of MCE and MCE::Shared load IO::Handle automatically in the event the autoflush method is not found.

        use strict; use warnings; use MCE::Hobo; use MCE::Mutex; use MCE::Shared; use Sereal qw( decode_sereal encode_sereal ); use Time::HiRes qw( time ); use constant { MAX_WORKERS => 3, CHUNK_SIZE => 5, }; # usage: script_que_hobo2.pl [ N ] my ( @wrks_in, @wrks_out ); my $que_in = MCE::Shared->queue( await => 1 ); my $mutex = MCE::Mutex->new(); my $start = time; open my $fh_out, ">:utf8", "output.txt"; $fh_out->autoflush(1); push @wrks_in, MCE::Hobo->create(\&task) for 1 .. MAX_WORKERS; input( shift || 1000 ); $que_in->end; $_->join for @wrks_in; close $fh_out; printf "duration: %0.3f\n", time - $start; sub input { my $max_id = shift; my $nxt_id = 0; my @data; while ( 1 ) { last if $nxt_id >= $max_id; push @data, ++$nxt_id; if ( @data == CHUNK_SIZE ) { $que_in->await(50); $que_in->enqueue( encode_sereal(\@data) ); @data = (); } } $que_in->enqueue( encode_sereal(\@data) ) if @data; } sub task { while ( defined ( my $frozen = $que_in->dequeue ) ) { my $array_ref = decode_sereal($frozen); my $output = ''; for ( 0 .. $#{ $array_ref } ) { $output .= $array_ref->[$_]."\n"; } $mutex->enter(sub { print {$fh_out} $output; }); } }

        Regards, Mario

        Please see my update 4 above. chunking definitely helped

        lbe

        Please see my update 4 above. chunking definitely helped

        lbe

Re: MCE: Slow IPC between child and gather process in parent
by Anonymous Monk on May 02, 2018 at 07:53 UTC
    Hi. I vaguely recall MCE having different backends, is there a difference in IPC seconds if you use threads backend or ...? What about a threads.pm/forks.pm version?

      Please see my update above

Re: MCE: Slow IPC between child and gather process in parent
by Anonymous Monk on May 02, 2018 at 15:34 UTC
    My guess is that you actually have generalized I/O contention and/or a higher-than-you-think paging load. Although you say that you have seven processes running, it seems to me that this workload is purely I/O-bound. That would account for the ~40% idle time but it also might account for other delays too. If the work that is being done basically does not involve the CPU, it can be counter-productive to parallelize it. I really don't expect that IPC would be the actual explanation.

      Yes, you could be correct. When developing parallelized programs like this one, I use two tools to help me identify how well or poorly my code is working. They are htop and iostat. htop is variation of the venerable top command that orignated on VMS and found its way to Linux and the Mac. It lets me monitor processor core utilization and near real time performance statistics of my process(es). iostat monitors the read and write rates to my hard disks. I run it as iostat 1 and it provides IO statistics on a second by second basis.

      Given my personal experience, these output from these tools indicated that I was no IO bound when running at the longer times - the disk transfer rate was too low for IO to have been a factor. Now, with the reduced times, I am seeing much higher IO rates and IO is definitely the limiting process

      Thanks!, lbe

        It could still be IO bound. Disk throughput is just one factor, there is also the wait times for the disks to serve the requests. On linux, use iostat -x to add the disk queue, wait, and service times to see if they increase significantly while your program is running.

        It can even be a factor on SSD disks, if there are many small reads and writes. In this case the SSD may lag behind because internally it has a minimum block size that it must read or write. For example, if you are reading / writing 8k blocks, but the SSD has a 128k internal block size, the effective top throughput can be 1/16th of the SSD's top throughput.

        A reply falls below the community's threshold of quality. You may see it by logging in.
        I would make the number of workers easily-adjustable and experiment with reducing it one-by-one to find the sweet spot.
Re: MCE: Slow IPC between child and gather process in parent
by Anonymous Monk on May 02, 2018 at 14:08 UTC

    How fast does it run single-threaded?

    Do you need to cross-reference the data between files or are their processing unrelated?

      Please see my update above

      It has been a long time since I ran through all of the data single threaded. My memory is that it took well over an hour.

      No, none of the files are cross-referenced. Each data file is independent of the others

      Thanks, lbe

Re: MCE: Slow IPC between child and gather process in parent
by Anonymous Monk on May 11, 2018 at 19:11 UTC

    Your listing of Throughput per Thread Count stops at 8 concurrent reads. Disk I/O is usually plotted at NCQ depths of 1, 2, 4, ... 32, and sometimes beyond.

    What model SSD is being used? Single disk, RAID, anything unusual about the setup? Streaming read transfer rate?

    Did you try running the task with elevated I/O privileges, using nice or whatever the ionice analog is on Mac?

    Mmap being faster than read() for small files is curious. Definitely try sysread() with a comfortably large buffer. Straight up read() ought to result in fewer syscalls, so maybe check the number of context switches and page faults for either approach.

    Lastly, the number of files is quite large. Did you say what the time was to just find+stat the files (or du the directory)?

A reply falls below the community's threshold of quality. You may see it by logging in.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://1213918]
Approved by Athanasius
Front-paged by Discipulus
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others taking refuge in the Monastery: (3)
As of 2024-06-14 08:02 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found

    Notices?
    erzuuli‥ 🛈The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.