Beefy Boxes and Bandwidth Generously Provided by pair Networks
Think about Loose Coupling
 
PerlMonks  

MCE gather and relay demonstrations

by marioroy (Priest)
on Feb 13, 2018 at 05:32 UTC ( #1209040=CUFP: print w/replies, xml ) Need Help??

Fellow Monks,

I received a request from John Martel to process a large flat file and expand each record to many records based on splitting out items in field 4 delimited by semicolons. Each row in the output is given a unique ID starting with one while preserving output order.

Thank you, John. This is a great use-case for MCE::Relay (2nd example).

Input File -- Possibly larger than 500 GiB in size

foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7 bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7 baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7 ...

Output File

000000000000001|item1|foo|field2|field3|field5|field6|field7 000000000000002|item2|foo|field2|field3|field5|field6|field7 000000000000003|item3|foo|field2|field3|field5|field6|field7 000000000000004|item4|foo|field2|field3|field5|field6|field7 000000000000005|itemN|foo|field2|field3|field5|field6|field7 000000000000006|item1|bar|field2|field3|field5|field6|field7 000000000000007|item2|bar|field2|field3|field5|field6|field7 000000000000008|item3|bar|field2|field3|field5|field6|field7 000000000000009|item4|bar|field2|field3|field5|field6|field7 000000000000010|itemN|bar|field2|field3|field5|field6|field7 000000000000011|item1|baz|field2|field3|field5|field6|field7 000000000000012|item2|baz|field2|field3|field5|field6|field7 000000000000013|item3|baz|field2|field3|field5|field6|field7 000000000000014|item4|baz|field2|field3|field5|field6|field7 000000000000015|itemN|baz|field2|field3|field5|field6|field7 ...

Example One

This example configures a custom function for preserving output order. Unfortunately, the sprintf function alone involves extra CPU time causing the manager process to fall behind. The workers may idle while waiting for the manager process to respond to the gather request.

use strict; use warnings; use MCE::Loop; my $infile = shift or die "Usage: $0 infile\n"; my $newfile = 'output.dat'; open my $fh_out, '>', $newfile or die "open error $newfile: $!\n"; sub preserve_order { my ($fh) = @_; my ($order_id, $start_idx, $idx, %tmp) = (1, 1); return sub { my ($chunk_id, $aref) = @_; $tmp{ $chunk_id } = $aref; while ( my $aref = delete $tmp{ $order_id } ) { foreach my $line ( @{ $aref } ) { $idx = sprintf "%015d", $start_idx++; print $fh $idx, $line; } $order_id++; } } } MCE::Loop::init { chunk_size => 'auto', max_workers => 3, gather => preserve_order($fh_out) }; mce_loop_f { my ($mce, $chunk_ref, $chunk_id) = @_; my @buf; foreach my $line (@{ $chunk_ref }) { $line =~ s/\r//g; chomp $line; my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line; my @items_array = split /;/, $items; foreach my $item (@items_array) { push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\n"; } } MCE->gather($chunk_id, \@buf); } $infile; MCE::Loop::finish(); close $fh_out;

Example Two

To factor out sprintf from the manager process, another way is via MCE::Relay for incrementing the ID value. Workers obtain the current ID value and increment/relay for the next worker, ordered by chunk ID behind the scene. Workers call sprintf in parallel. This allows the manager process (out_iter_fh) to accommodate up to 32 workers and not fall behind. It also depends on IO performance, of course.

The MCE::Relay module is loaded automatically whenever the MCE init_relay option is specified.

use strict; use warnings; use MCE::Loop; use MCE::Candy; my $infile = shift or die "Usage: $0 infile\n"; my $newfile = 'output.dat'; open my $fh_out, '>', $newfile or die "open error $newfile: $!\n"; MCE::Loop::init { chunk_size => 'auto', max_workers => 8, gather => MCE::Candy::out_iter_fh($fh_out), init_relay => 1 }; mce_loop_f { my ($mce, $chunk_ref, $chunk_id) = @_; my @lines; foreach my $line (@{ $chunk_ref }) { $line =~ s/\r//g; chomp $line; my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line; my @items_array = split /;/, $items; foreach my $item (@items_array) { push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\n"; } } my $idx = MCE::relay { $_ += scalar @lines }; my $buf = ''; foreach my $line ( @lines ) { $buf .= sprintf "%015d|%s", $idx++, $line } MCE->gather($chunk_id, $buf); } $infile; MCE::Loop::finish(); close $fh_out;

Relay accounts for the worker handling the next chunk_id value. Therefore, do not call relay more than once inside the block. Doing so will cause IPC to stall.

Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: CUFP [id://1209040]
Approved by beech
Front-paged by Discipulus
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others meditating upon the Monastery: (10)
As of 2018-10-15 13:04 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    When I need money for a bigger acquisition, I usually ...














    Results (82 votes). Check out past polls.

    Notices?