Beefy Boxes and Bandwidth Generously Provided by pair Networks
more useful options

ithreads, locks, shared data: is that OK?

by bliako (Vicar)
on Sep 18, 2018 at 23:11 UTC ( #1222616=perlquestion: print w/replies, xml ) Need Help??

bliako has asked for the wisdom of the Perl Monks concerning the following question:

I want some advice for when using Perl's ithreads to do parallel processing and return back the results.

For this particular situation I have one or more input words and a huge dictionary. I search the input word for similar words (given some metric) and return back these similar words. I repeat the process with these words as input and so on until I go into some depth and be able to create a Graph of a few hundred nodes.

Parallelisation: each thread checks for its input word via a Thread::Queue and when it gets one it calls a function ref which will know where the dictionary is and what to do to get the similar words out. I have included a mock function here which just gets random words as the other one is too long and convoluted.

Additional to the work-queue, I have three more Thread::Queue objs: to save words-currently-being-processed (so as not to re-process them), for words-done-i.e.-results, and for failures-of-the-distance-function.

I also have a couple of shared scalars : an integer to contain the total number of items processed and a flag to know if processing has to abort.

Finally, I have the huge dictionary which is absolutely readonly and I do not know whether I have a choice in just distributing a ref to each thread rather than duplicating it to each.

The test program works OK. (one needs a dictionary file which linux has at specified path or get 1MB worth from but I am not really sure I am doing the right things sometimes. For example where I am locking a Thread::Queue or a shared variable. And about passing references to scalars to the threads to keep track of the total number of items each thread processes. It's lovely in C but is it ok here?

Most importantly, is there a way to avoid duplicating that huge dictionary (without explicitly share()ing each and every entry of the array or hash.

I have another question, less important: I tried to use my own SIGINT handler to call stop() on Ctrl-C but it does not propagate all the way to the threads when threads start. I would say it is being completely overriden (by what?).

Here is the program as a test preamble and the package in one file, many thanks bliako:

#!/usr/bin/perl use strict; use warnings; select(STDERR); $| = 1; select(STDOUT); $| = 1; #use lib './'; #use Parler; my $par = undef; my $fh; open($fh, '<', '/usr/share/dict/words') || die "could not open diction +ary, $!\n"; my @dictionary = (); while( <$fh> ){ chomp; push @dictionary, $_ } close($fh); $par = Parler->new({ 'dictionary' => \@dictionary, }); my $results = $par->process({ 'num-threads' => 4, 'max-items' => 10, 'words' => ['pearl', 'jam'], }); die "error calling process()" if not defined($results); # This package does "lexical analysis"(mock) given a huge # dictionary. In parallel over a number of thread-workers. # Cost to pay: duplicates the read-only dictionary # over each thread. # OO style. # by bliako # 19/09/2018 package Parler; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use Thread::Stack; sub new { my $class = $_[0]; my $params = $_[1] // {}; my $self = { # an array of words but can also be a hash - not neste +d 'dictionary' => $params->{'dictionary'}, 'dictionary-size' => scalar( @{$params->{'dictionary'} +} ), # the stop-flag is a ref to an integer which is set to + 1 # whenever stop() is called. 'stop-now-flag' => &share(\my $whatever), }; bless($self, $class); ${$self->{'stop-now-flag'}} = 0; # Here is the complex sub to find synonyms. # However, I am using just a mock for now which # spits out random words from the dictionary as arrayref $self->{'find-synonyms-sub'} = sub { my $aword = shift; return [ map { $self->{'dictionary'}->[ int(rand($self->{'dictionary-size'} +)) ] } ( ] }; return $self } # a signal handler could just call this to abort... sub stop { ${$_[0]->{'stop-now-flag'}} = 1 } # spawns threads to do the processing. waits for the threads and retur +ns results # based on comments by BrowserUK and others on # L<How to create thread pool of ithreads| +ode_id=735923> sub process { my $self = $_[0]; my $params = $_[1] // {}; # required param my $inwords = $params->{'words'}; die "process() : 'words' needed." if ! defined($inwords); # optional input params: my $num_threads = $params->{'num-threads'} // 4; my $max_items_to_process : shared = $params->{'max-items'} // +20; my $work_Q = Thread::Queue->new(); my $failures_Q = Thread::Queue->new(); my $results_Q = Thread::Queue->new(); my $currently_working_on_Q = Thread::Queue->new(); my $num_items_processed_so_far : shared = 0; # spawn the workers my @thread_pool = (); for(1..$num_threads){ my $athread = threads->create( \&Parler::worker, # subref to worker $work_Q, $failures_Q, $results_Q, $currently_working_on_Q, \$num_items_processed_so_far, $max_items_to_process, $self->{'stop-now-flag'}, $self->{'find-synonyms-sub'}, time ); if( ! defined($athread) ){ print STDERR "process() : failed to create thr +ead.\n"; } else { push(@thread_pool, $athread); } } # now add all the specified input words and let it start # lock it and in a block so that the lock goes away when out { lock($work_Q); $work_Q->enqueue($_) for @$inwords; } # wait until all threads finish for(@thread_pool){ $_->join; print "process() : thread terminated and joined.\n"; } my $astr; # did we have failures? while( defined($astr=$failures_Q->dequeue_nb()) ){ print "process() : got failure : $astr\n"; } # any items still pending? while( defined($astr=$currently_working_on_Q->dequeue_nb()) ){ print "process() : words in unfinished items queue: $a +str\n"; } my @results; print "process() : done. Here are the results:\n"; while( defined($astr=$results_Q->dequeue_nb()) ){ my ($word, @synons) = split(/\t/, $astr); print " $word : ".join(", ", @synons)."\n"; push(@results, [$word, \@synons]); } print "process() : items found $num_items_processed_so_far (ma +x was $max_items_to_process).\n"; return \@results } sub worker { my ( $WQ, # work queue $FQ, # failures queue $RQ, # send-results-back queue $CWQ, # items-currently-working-on queue $num_items_done_ref, # how many items all workers have + completed $max_items_to_do, # how many items we need to shutdown $stop_now_ref, # a flag to stop if set to 1 $synonyms_finder_sub_ref, # a coderef to call to find +the synonyms $time_started, ) = @_; my $tid = threads->tid(); print "thread $tid : started in ".(time-$time_started)."s. syn +onyms_finder_sub_ref=$synonyms_finder_sub_ref\n"; my $time_started2 = time; my ($word, $astr, $i); MAINLOOP: while( $$num_items_done_ref <= $max_items_to_do and $$stop_now_ref == 0 ){ print "thread $tid : looping and stop is ".$$stop_now_ +ref."\n"; # wait for work to appear for up the specified # time (do not set to zero or use a sleep) # or go back to check the loop conditions # our job is to get the next word from the Q # and find its synonyms next unless defined($word=$WQ->dequeue_timed(1)); print "thread $tid : got a word : '$word'\n"; # is anyone else working on this word? { lock($CWQ); # lock it first for($i=$CWQ->pending();$i-->0;){ next MAINLOOP if $CWQ->peek($i) =~ /^$ +word\t/ } } print "thread $tid : nobody works on this word: $word\ +n"; # word has been explored, don't do it again { lock($RQ); # lock it first for($i=$RQ->pending();$i-->0;){ next MAINLOOP if $RQ->peek($i) =~ /^$w +ord\t/ } } print "thread $tid : word has not been explored: $word +\n"; # fine we proceed: print "thread $tid : working on word $word\n"; # 1. register we are doing this word { lock($CWQ); $CWQ->enqueue($word."\t".$tid); } # 2. process it calling the sub-ref and possibly other +s # results is an array ref of synonyms my $synons = $synonyms_finder_sub_ref->($word); if( defined($synons) ){ print "thread $tid : got synonyms for '$word' +: ".join(", ", @$synons)."\n"; # word has been processed ok # 1. insert it to the results { lock($RQ); $RQ->enqueue(join("\t", $word, @$synon +s)); } # 2. insert all synonyms found into the work q +ueue # so all threads to explore them { lock($WQ); $WQ->enqueue($_) for @$synons; } # 3. update the total number of items complete +d (will shutdown when max) { lock($num_items_done_ref); $$num_items_done_ref = $$num_items_don +e_ref + scalar(@$synons); } } else { # oops error print STDERR "thread $tid : call to synonyms_f +inder_sub_ref() has failed for word : $word. Skipping this word ...\n +"; { lock($FQ); $FQ->enqueue("thread $tid : failed for + word '$word'"); } } # 4. remove it from the currently-working hash (even i +f failed) { lock($CWQ); for($i=$CWQ->pending();$i-->0;){ $CWQ->extract($i) if $CWQ->peek($i) =~ + /^$word\t/ } } } print "thread $tid : shut down after ".(time-$time_started2)." +s.\n"; } 1;

Replies are listed 'Best First'.
Re: ithreads, locks, shared data: is that OK?
by choroba (Bishop) on Sep 19, 2018 at 13:44 UTC
    Too many questions and lines of code in one post :-)

    Locking a Thread::Queue shouldn't be needed, that's what the module does for you.

    If you need the dictionary to be fast, there's no other option than to share it or duplicate it. If it could be slow, you can create another worker with its own queues that would update the dictionary and answer queries about its content. Basically, when you use Thread::Queue, you shouldn't need threads::shared at all.

    ($q=q:Sq=~/;[c](.)(.)/;chr(-||-|5+lengthSq)`"S|oS2"`map{chr |+ord }map{substrSq`S_+|`|}3E|-|`7**2-3:)=~y+S|`+$1,++print+eval$q,q,a,

      Sorry, I wanted to post the complete program.

      Regarding the locking of Thread::Queue, I saw in their doc (Advanced Methods) how to lock queue. And decided to do the same. Do you know what is the purpose of the lock() in the manual?

      As for the dictionary, thanks for the suggestion. I will investigate if the performance is OK.

      So, thanks for the info and sorry for the long code, bliako.

        You don't need to lock the queue for enqueue and dequeue. You need it for peek, but why do you need peek? Enqueue and dequeue should be all you need :-)

        ($q=q:Sq=~/;[c](.)(.)/;chr(-||-|5+lengthSq)`"S|oS2"`map{chr |+ord }map{substrSq`S_+|`|}3E|-|`7**2-3:)=~y+S|`+$1,++print+eval$q,q,a,

Log In?

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://1222616]
Approved by Paladin
Front-paged by stevieb
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others about the Monastery: (7)
As of 2019-11-21 08:28 GMT
Find Nodes?
    Voting Booth?
    Strict and warnings: which comes first?

    Results (104 votes). Check out past polls.