Beefy Boxes and Bandwidth Generously Provided by pair Networks
Problems? Is your data what you think it is?
 
PerlMonks  

Re^5: shared scalar freed early

by marioroy (Prior)
on Mar 08, 2017 at 06:00 UTC ( [id://1183919]=note: print w/replies, xml ) Need Help??


in reply to Re^4: shared scalar freed early
in thread shared scalar freed early

Hello,

Below is another variation. Here, workers write directly to the output handle, similarly to testa. The MCE relay option when defined loads MCE::Relay and with that enables relay capabilities. Relay is beneficial in places where workers must run orderly and serially. Only a single worker can run inside the relay block below. The important thing is that workers enter it orderly by chunk_id. In other words, workers wait their turn. The worker with chunk_id 1 goes first, then worker with chunk_id 2 next, and so forth.

I forgot to mention that MCE can spawn threads. Simply add "use threads" at the top of the script, prior to loading MCE. This allows the use of Thread::Queue, Thread::Semaphore, and friends. If curious, compare memory consumption with testa against this one. I increased $iterations to 1000 to be able to monitor the process in another window. Typically, running without threads is faster on Unix. Either way, the option is yours to make if threads is a better fit; e.g wanting to use Threads::Queue.

use strict; use warnings; use MCE; use Time::HiRes 'time'; my $iterations = 100; my $chunksize = 50; my $threads = 5; my $output = "m.txt"; my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } open my $fh, '>', $output or die "open error: $!"; $fh->autoflush(1); test_mce(); close $fh; sub test_mce { my $start = time; my $mce = MCE->new( max_workers => $threads, chunk_size => $chunksize, input_data => input_iter($chunksize, $iterations), user_func => \&work, init_relay => 0, )->run(); printf STDERR "testa done in %0.02f seconds\n", time - $start; } # make an input closure, return iterator sub input_iter { my ($chunk_size, $iterations) = @_; my $seq_a = 1; return sub { return if $seq_a > $iterations; my ($chunk_size) = @_; my @chunk = (); foreach my $seq_b ( 1 .. $chunk_size ) { my %retdata = %data; $retdata{'.'} = $seq_a * $seq_b; push @chunk, \%retdata; } $seq_a += 1; return \@chunk; }; } # MCE task to run in parallel sub work { my ($mce, $chunk_ref, $chunk_id) = @_; my $data = $chunk_ref->[0]; my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } my $buf = ''; foreach my $data (@ret) { foreach my $key (sort keys %$data) { $buf .= $$data{$key}; } $buf .= "\n"; } MCE::relay { print {$fh} $buf }; }

Regards, Mario.

Replies are listed 'Best First'.
Re^6: shared scalar freed early
by chris212 (Scribe) on Mar 08, 2017 at 18:21 UTC
    That is exactly what I needed. The final piece of the puzzle to help me understand how to effectively use MCE in my script. It is now faster than ever, doesn't use any shared variables, and the parallelization is completely handled by MCE. It just took me a bit to realize I needed to remove my signal handlers. The relays must use SIGINT or something. It even takes my input file handle and does the chunked reading for me, and works with IPC file handles from the gzip command. The relay blocks as needed to keep output in sequence. Thanks a lot!

      Below, an example using MCE::Flow. It chunks and outputs swiftly. The relay option when defined loads MCE::Relay and with that enables relay capabilities. Relay is beneficial in places where workers must run orderly and serially. Only a single worker can run inside the relay block. The important thing is that workers enter it orderly by "chunk id" value.

      If threads is desired on a Unix platform, simply load threads prior to loading MCE. By default, MCE spawns threads if present. Unlike MCE.pm where chunk_size defaults to 1, chunk_size is configured automatically for MCE models.

      use strict; use warnings; use MCE::Flow; ## Make gzip file { open my $fh, '|-', 'gzip > test.txt.gz'; foreach (1..100000) { print {$fh} sprintf('%04d',$_).('abc123' x 10)."\n"; } close $fh; } ## Read gzip file open my $fh, '-|', 'gzip -cd test.txt.gz' or die "open error: $!\n"; STDOUT->autoflush(1); # important MCE::Flow->init( max_workers => 3, init_relay => 0 ); sub test { my ($mce, $chunkref, $chunkid) = @_; my ($buf, $wid) = ('', MCE->wid()); for my $i (0 .. $#{ $chunkref }) { $buf .= $chunkref->[$i]; } MCE::relay { print "## worker: $wid, chunkid: $chunkid\n".$buf; }; } MCE::Flow->run_file(\&test, $fh); MCE::Flow->finish(); close($fh);

      Cheers, Mario.

      You're welcome. Regarding relay capabilities, the worker who happens to have chunk 1 goes first, then worker with chunk 2 goes next, and so forth. In other words, order is by "chunk id" for entering the relay block.

      Regards, Mario.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1183919]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others taking refuge in the Monastery: (7)
As of 2024-03-28 13:50 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found