Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl: the Markov chain saw
 
PerlMonks  

Re: Multi-thread combining the results together

by marioroy (Vicar)
on Jul 26, 2019 at 03:22 UTC ( #11103411=note: print w/replies, xml ) Need Help??


in reply to Multi-thread combining the results together

Hello, fellow monks of the monastary,

This post measures how long using the various parallel implementations. Please compare from left to right and not so much between OS'es. The results were captured on a MacBook Pro (late 2013 model). It's an i7 core CPU with 4 real cores. The specs says 2.6 GHz but may run up to 3.6 GHz on one core with Turbo Boost in effect.

Serial threads MCE::Hobo MCE::Loop MCE::Child Linux 74.488s 25.345s 20.217s 17.932s 17.291s macOS 84.010s 24.844s 24.505s 23.786s 23.069s Windows 102.055s 29.702s 37.913s 28.376s 27.839s Cygwin 162.474s ------- 50.092s 45.318s 43.949s FreeBSD 107.569s ------- 31.138s 30.530s 30.050s threads is slow on Cygwin, so stopped due to taking too long. My FreeBSD has an older Thread::Queue missing the ->end method.

Serial Code

I very much like to know how long running on a single core. These days, processors have Turbo Boost capabilities. Thus, not easy comparing total time between serial and parallel. The reason is that the processor might be running at a higher clock speed when utilizing one core only. So be mindful of that.

use strict; use warnings; use Time::HiRes 'time'; my @tokens = ('aaa'...'zzz'); my $start = time; sub build_regex { my ($token) = @_; chop $token; $token .= 'a'; } my %result; foreach my $token ( @tokens ) { my $regex = build_regex($token); my @line_results = grep { $_ ne $token and /$regex/ } @tokens; $result{$token} = \@line_results if @line_results; }; printf "duration: %0.03f seconds\n", time - $start; print scalar(keys %result), "\n"; # 16900

threads

This is based off Grandfather's example. It was modified to run the serial demonstration in parallel (i.e. same input data). Workers enqueuing key => val pair means that the manager process must also dequeue(2) items.

use strict; use warnings; use threads; use Thread::Queue; use Time::HiRes 'time'; my $workQueue = Thread::Queue->new(); my $doneQueue = Thread::Queue->new(); my $numWorkers = 4; my @tokens = ('aaa'...'zzz'); my $start = time; sub build_regex { my ($token) = @_; chop $token; $token .= 'a'; } threads->create(sub{DoWork($workQueue, $doneQueue)}) for 1..$numWorker +s; $workQueue->enqueue($_) for @tokens; $workQueue->end(); my $count_finished = 0; my %result; while () { my ($key, $val) = $doneQueue->dequeue(2); last if (!$key && ++$count_finished == $numWorkers); $result{$key} = $val if $key; } $_->join() for threads->list; printf "duration: %0.03f seconds\n", time - $start; print scalar(keys %result), "\n"; # 16900 exit; sub DoWork { my ($work, $done) = @_; while (my $token = $work->dequeue()) { my $regex = build_regex($token); my @line_results = grep { $_ ne $token and /$regex/ } @tokens; $done->enqueue($token => \@line_results) if @line_results; } $done->enqueue(0,0); }

MCE::Hobo

Fortunately, one can run similarly using MCE::Hobo if your Perl binary lacks threads support.

use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Time::HiRes 'time'; my $workQueue = MCE::Shared->queue(); my $doneQueue = MCE::Shared->queue(); my $numWorkers = 4; my @tokens = ('aaa'...'zzz'); my $start = time; sub build_regex { my ($token) = @_; chop $token; $token .= 'a'; } MCE::Hobo->create(sub{DoWork($workQueue, $doneQueue)}) for 1..$numWork +ers; $workQueue->enqueue($_) for @tokens; $workQueue->end(); my $count_finished = 0; my %result; while () { my ($key, $val) = $doneQueue->dequeue(2); last if (!$key && ++$count_finished == $numWorkers); $result{$key} = $val if $key; } $_->join() for MCE::Hobo->list; printf "duration: %0.03f seconds\n", time - $start; print scalar(keys %result), "\n"; # 16900 exit; sub DoWork { my ($work, $done) = @_; while (my $token = $work->dequeue()) { my $regex = build_regex($token); my @line_results = grep { $_ ne $token and /$regex/ } @tokens; $done->enqueue($token => \@line_results) if @line_results; } $done->enqueue(0,0); }

MCE::Loop

Think of MCE's input mechanism as the workQueue. Likewise, think of MCE's gathering capability as sending to a doneQueue. This is basically what happens internally and handled efficiently.

use strict; use warnings; use MCE::Loop; use Time::HiRes 'time'; my @tokens = ('aaa'...'zzz'); my $start = time; sub build_regex { my ($token) = @_; chop $token; $token .= 'a'; } MCE::Loop->init( chunk_size => 1, max_workers => 4, ); my %result = mce_loop { my $token = $_; my $regex = build_regex($token); my @line_results = grep { $_ ne $token and /$regex/ } @tokens; MCE->gather( $token => \@line_results ) if @line_results; } @tokens; MCE::Loop->finish; printf "duration: %0.03f seconds\n", time - $start; print scalar(keys %result), "\n"; # 16900

MCE::Child

Welcome to MCE::Child and MCE::Channel (both new). Unlike the prior two queue demonstrations, there is one important consideration to remember about MCE::Channel. The channel behaves more like a pipe (i.e. the data resides in the OS-level socket buffer and not in memory). There are some advantage to this such as not having to worry about the producer running faster than consumers.

I omitted the doneQueue. Instead, workers store the results locally and send upon termination. This does shorten the code if anything. The real reason (regarding MCE::Channel) is that I'm not having to worry about a dead-lock situation. Instantiating a doneQueue using MCE::Channel could lead to workers blocking because the manager process is not yet dequeuing.

use strict; use warnings; use MCE::Child; use MCE::Channel; use Time::HiRes 'time'; my $workQueue = MCE::Channel->new(impl => 'Mutex'); my $numWorkers = 4; my @tokens = ('aaa'...'zzz'); my $start = time; sub build_regex { my ($token) = @_; chop $token; $token .= 'a'; } MCE::Child->create(sub{DoWork($workQueue)}) for 1..$numWorkers; $workQueue->enqueue($_) for @tokens; $workQueue->end(); my %result = map { %{ $_->join() } } MCE::Child->list; printf "duration: %0.03f seconds\n", time - $start; print scalar(keys %result), "\n"; # 16900 exit; sub DoWork { my ($workQueue) = @_; my %result; # store locally while (my $token = $workQueue->dequeue()) { my $regex = build_regex($token); my @line_results = grep { $_ ne $token and /$regex/ } @tokens; $result{$token} = \@line_results if @line_results; } return \%result; }

The take-away from this is that one may have workers store result locally and send upon termination. This is a way of decreasing IPC overhead. Not shown here is MCE's chunking capabilities.

Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://11103411]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others exploiting the Monastery: (3)
As of 2019-11-17 07:56 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Strict and warnings: which comes first?



    Results (86 votes). Check out past polls.

    Notices?