Beefy Boxes and Bandwidth Generously Provided by pair Networks
Syntactic Confectionery Delight
 
PerlMonks  

Re: Wait for individual sub processes

by marioroy (Prior)
on Apr 25, 2015 at 12:30 UTC ( [id://1124673]=note: print w/replies, xml ) Need Help??


in reply to Wait for individual sub processes [SOLVED]

The following is an example using MCE to process a file. Splitting the file into parts before running is not necessary. Chunking is integrated into MCE allowing for maximum CPU utilization from start to end.

Update 1: Added merge_to_iter to merge the output into one file while preserving order.

Update 2: $slurp_ref is a scalar reference, thus print $fh $$slurp_ref;

Update 3: Changed chunk_size from 'auto' to 200. A chunk size smaller than or equal to 8192 is the number of records (or # of lines). A value greater than 8192 is the number of bytes with MCE reading until the end of record. MCE quietly sets to 64M if higher than 64M. The OP provided timings with 100 rows taking ~ 1 minute.

Update 4: Changed max_workers from 'auto' to 16. The 'auto' value will never go higher than 8. Thus, one must set explicitly if wanting to run on all available cores or with max_workers => MCE::Util::get_ncpu().

use MCE::Flow; die "Not enough arguments given\n" if @ARGV < 1; my $file = shift; my $odir = "/path/to/output_dir"; sub merge_to_iter { my ($ofile) = @_; my %tmp; my $order_id = 1; open my $ofh, '>', $ofile or die "Cannot open $ofile: $!\n"; select $ofh; $| = 1; # flush immediately return sub { my ($chunk_id, $opart) = @_; $tmp{$chunk_id} = $opart; while (1) { last unless exists $tmp{ $order_id }; $opart = delete $tmp{ $order_id++ }; # slurp (append $ifh) to $ofh open my $ifh, '<', $opart; local $/; print $ofh scalar <$ifh>; close $ifh; unlink $opart; } }; } mce_flow_f { gather => merge_to_iter("$odir/$file.out"), max_workers => 16, chunk_size => 200, use_slurpio => 1, }, sub { my ($mce, $slurp_ref, $chunk_id) = @_; my $part = "$odir/$file.$chunk_id"; open my $fh, '>', $part or die "Cannot open $part: $!\n"; print $fh $$slurp_ref; close $fh; exec("sh text_tool $part > $part.out") or die "Cannot exec $part\n"; print {*STDERR} "Finished processing $part at ".localtime."\n"; $mce->gather($chunk_id, "$part.out"); unlink $part; }, $file;

Replies are listed 'Best First'.
Re^2: Wait for individual sub processes
by BrowserUk (Patriarch) on Apr 25, 2015 at 12:39 UTC
    Splitting the file into parts before running is not necessary. Chunking is integrated into MCE

    But does it solve the OPs problem of uneven processor use due to disparate processing requirements of records?

    If so, how?

    Does it reassemble the output in the correct order without allowing the slow processing of some records to block processing or output from other subsequent records?

    If so, how?


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
    In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

      Yes and yes. I added merge_to_iter to the MCE example.

      MCE follows a bank-teller queuing model when processing input. A slow chunk will not delay or block subsequent chunks. Each chunk comes with a $chunk_id value which is beneficial for preserving output order. Out of order items from gathering are held temporarily until ordered items arrive.

      MCE processes immediately. Thus, the pre-processing step to split the file into parts is not necessary.

      I applied a correction to the example; $slurp_ref is a scalar reference, thus print $fh $$slurp_ref;

      There merge_to_iter (the iterator itself) is executed by the manager process while running.

        Out of order items from gathering are held temporarily until ordered items arrive.

        so, if one of the early records takes an exceptionally long time to process, all the outputs from records processed after it will accumulate in memory until that record finally finishes, thus risk memory exhaustion?

        If so, is there any mechanism, automated or manual, for detecting that memory accumulation and suspending chunk dispatch until the exceptionally slow record is processed and the output released?


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
        In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1124673]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others about the Monastery: (2)
As of 2024-04-26 00:37 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found