Fellow Monks,
I received a request from John Martel to process a large flat file and expand each record to many records based on splitting out items in field 4 delimited by semicolons. Each row in the output is given a unique ID starting with one while preserving output order.
Thank you, John. This is a great use-case for MCE::Relay (2nd example).
Input File -- Possibly larger than 500 GiB in size
foo|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
bar|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
baz|field2|field3|item1;item2;item3;item4;itemN|field5|field6|field7
...
Output File
000000000000001|item1|foo|field2|field3|field5|field6|field7
000000000000002|item2|foo|field2|field3|field5|field6|field7
000000000000003|item3|foo|field2|field3|field5|field6|field7
000000000000004|item4|foo|field2|field3|field5|field6|field7
000000000000005|itemN|foo|field2|field3|field5|field6|field7
000000000000006|item1|bar|field2|field3|field5|field6|field7
000000000000007|item2|bar|field2|field3|field5|field6|field7
000000000000008|item3|bar|field2|field3|field5|field6|field7
000000000000009|item4|bar|field2|field3|field5|field6|field7
000000000000010|itemN|bar|field2|field3|field5|field6|field7
000000000000011|item1|baz|field2|field3|field5|field6|field7
000000000000012|item2|baz|field2|field3|field5|field6|field7
000000000000013|item3|baz|field2|field3|field5|field6|field7
000000000000014|item4|baz|field2|field3|field5|field6|field7
000000000000015|itemN|baz|field2|field3|field5|field6|field7
...
Example One
This example configures a custom function for preserving output order. Unfortunately, the sprintf function alone involves extra CPU time causing the manager process to fall behind. The workers may idle while waiting for the manager process to respond to the gather request.
use strict;
use warnings;
use MCE::Loop;
my $infile = shift or die "Usage: $0 infile\n";
my $newfile = 'output.dat';
open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
sub preserve_order {
my ($fh) = @_;
my ($order_id, $start_idx, $idx, %tmp) = (1, 1);
return sub {
my ($chunk_id, $aref) = @_;
$tmp{ $chunk_id } = $aref;
while ( my $aref = delete $tmp{ $order_id } ) {
foreach my $line ( @{ $aref } ) {
$idx = sprintf "%015d", $start_idx++;
print $fh $idx, $line;
}
$order_id++;
}
}
}
MCE::Loop::init {
chunk_size => 'auto', max_workers => 3,
gather => preserve_order($fh_out)
};
mce_loop_f {
my ($mce, $chunk_ref, $chunk_id) = @_;
my @buf;
foreach my $line (@{ $chunk_ref }) {
$line =~ s/\r//g; chomp $line;
my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
my @items_array = split /;/, $items;
foreach my $item (@items_array) {
push @buf, "|$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
}
}
MCE->gather($chunk_id, \@buf);
} $infile;
MCE::Loop::finish();
close $fh_out;
Example Two
To factor out sprintf from the manager process, another way is via MCE::Relay for incrementing the ID value. Workers obtain the current ID value and increment/relay for the next worker, ordered by chunk ID behind the scene. Workers call sprintf in parallel. This allows the manager process (out_iter_fh) to accommodate up to 32 workers and not fall behind. It also depends on IO performance, of course.
The MCE::Relay module is loaded automatically whenever the MCE init_relay option is specified.
use strict;
use warnings;
use MCE::Loop;
use MCE::Candy;
my $infile = shift or die "Usage: $0 infile\n";
my $newfile = 'output.dat';
open my $fh_out, '>', $newfile or die "open error $newfile: $!\n";
MCE::Loop::init {
chunk_size => 'auto', max_workers => 8,
gather => MCE::Candy::out_iter_fh($fh_out),
init_relay => 1
};
mce_loop_f {
my ($mce, $chunk_ref, $chunk_id) = @_;
my @lines;
foreach my $line (@{ $chunk_ref }) {
$line =~ s/\r//g; chomp $line;
my ($f1,$f2,$f3,$items,$f5,$f6,$f7) = split /\|/, $line;
my @items_array = split /;/, $items;
foreach my $item (@items_array) {
push @lines, "$item|$f1|$f2|$f3|$f5|$f6|$f7\n";
}
}
my $idx = MCE::relay { $_ += scalar @lines };
my $buf = '';
foreach my $line ( @lines ) {
$buf .= sprintf "%015d|%s", $idx++, $line
}
MCE->gather($chunk_id, $buf);
} $infile;
MCE::Loop::finish();
close $fh_out;
Relay accounts for the worker handling the next chunk_id value. Therefore, do not call relay more than once inside the block. Doing so will cause IPC to stall.
Regards, Mario
-
Are you posting in the right place? Check out Where do I post X? to know for sure.
-
Posts may use any of the Perl Monks Approved HTML tags. Currently these include the following:
<code> <a> <b> <big>
<blockquote> <br /> <dd>
<dl> <dt> <em> <font>
<h1> <h2> <h3> <h4>
<h5> <h6> <hr /> <i>
<li> <nbsp> <ol> <p>
<small> <strike> <strong>
<sub> <sup> <table>
<td> <th> <tr> <tt>
<u> <ul>
-
Snippets of code should be wrapped in
<code> tags not
<pre> tags. In fact, <pre>
tags should generally be avoided. If they must
be used, extreme care should be
taken to ensure that their contents do not
have long lines (<70 chars), in order to prevent
horizontal scrolling (and possible janitor
intervention).
-
Want more info? How to link
or How to display code and escape characters
are good places to start.