Beefy Boxes and Bandwidth Generously Provided by pair Networks
more useful options
 
PerlMonks  

Re: Multithreading a large file split to multiple files

by marioroy (Priest)
on May 15, 2018 at 02:17 UTC ( #1214513=note: print w/replies, xml ) Need Help??


in reply to Multithreading a large file split to multiple files

Hi 10isitch,

Welcome to the monastery! Without a test sample of the input data, the MCE solution may or not work depending on whether or not processing multi-line records. Highlight: There is no data passing between the manager process and workers for input and output. This saves on IPC overhead. IO is read and written sequentially, not random.

use strict; use warnings; use MCE; use IO::Handle; # for autoflush if (@ARGV != 3) { print "Usage: <BNH scenario file> <Equity scenario output> <IR sce +nario output>\n"; exit -1; } print "\nSTARTING $0...\n"; my ($infile, $eqfile, $irfile) = @ARGV; unless (-e $infile) { print "Error: Cannot open $infile!\n"; exit -1; } unless (open(EQFILE, ">", $eqfile)) { print "Error: Cannot open $eqfile!\n"; exit -1; } unless (open(IRFILE, ">", $irfile)) { print "Error: Cannot open $irfile!\n"; exit -1; } # Must enable autoflush whenever workers # write directly to output file handles. EQFILE->autoflush(1); IRFILE->autoflush(1); # The user function for MCE workers. # Workers open a file handle to a scalar ref # due to using MCE option use_slurpio => 1. sub user_func { my ($mce, $slurp_ref, $chunk_id) = @_; my ($eqbuf, $irbuf) = ('',''); my ($w1, $w2); open INFILE, "<", $slurp_ref; # The gist of it all is concatenation to buffer # string(s) while inside the loop. while (<INFILE>) { chomp $_; if ($_ =~ /ScenSet/) { $eqbuf .= "$_\n"; $irbuf .= "$_\n"; next; } my @arr = split/\,/; if ($arr[1]) { $eqbuf .= ",equity,Base Scenario,0,\n"; $irbuf .= ",irate,Base Scenario,0,\n"; next; } if ($arr[2]) { my @arrn = split(/\_/,$arr[2]); $eqbuf .= ",,eq_".$arrn[1].",1,\n"; $irbuf .= ",,ir_".$arrn[1].",1,\n"; next; } if ($arr[6]) { $w1 = $w2 = 0; } # Riskfactor contains the string "Index" will be identified # as equity riskfactor if ($_ =~ /Index/ && $_ !~ /Credit/) { $w1 = 1; } # Assumption for IR - there will be only riskfactor # USDSWAP and USDTREA if ($_ =~ /USDSWAP/ || $_ =~ /USDTREA/) { $w2 = 1; } if ($w1) { $eqbuf .= "$_\n"; } if ($w2) { $irbuf .= "$_\n"; } } close INFILE; # Workers write directly to output files sequentially # and orderly, one worker at a time inside the MCE::relay # block. Call this one time only and outside the loop. MCE::relay { print EQFILE $eqbuf if length($eqbuf); print IRFILE $irbuf if length($irbuf); }; return; } # Using the core MCE API. Workers read the input file # directly and sequentially, one worker at a time. MCE->new( max_workers => 3, input_data => $infile, chunk_size => 2 * 1024 * 1024, # 2 MiB use_slurpio => 1, init_relay => 0, # loads MCE::Relay user_func => \&user_func, )->run(); close EQFILE; close IRFILE;

Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://1214513]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others about the Monastery: (6)
As of 2018-11-14 06:32 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    My code is most likely broken because:
















    Results (163 votes). Check out past polls.

    Notices?