http://www.perlmonks.org?node_id=1222616

bliako has asked for the wisdom of the Perl Monks concerning the following question:

I want some advice for when using Perl's ithreads to do parallel processing and return back the results.

For this particular situation I have one or more input words and a huge dictionary. I search the input word for similar words (given some metric) and return back these similar words. I repeat the process with these words as input and so on until I go into some depth and be able to create a Graph of a few hundred nodes.

Parallelisation: each thread checks for its input word via a Thread::Queue and when it gets one it calls a function ref which will know where the dictionary is and what to do to get the similar words out. I have included a mock function here which just gets random words as the other one is too long and convoluted.

Additional to the work-queue, I have three more Thread::Queue objs: to save words-currently-being-processed (so as not to re-process them), for words-done-i.e.-results, and for failures-of-the-distance-function.

I also have a couple of shared scalars : an integer to contain the total number of items processed and a flag to know if processing has to abort.

Finally, I have the huge dictionary which is absolutely readonly and I do not know whether I have a choice in just distributing a ref to each thread rather than duplicating it to each.

The test program works OK. (one needs a dictionary file which linux has at specified path or get 1MB worth from https://gist.githubusercontent.com/wchargin/8927565/raw/d9783627c731268fb2935a731a618aa8e95cf465/words) but I am not really sure I am doing the right things sometimes. For example where I am locking a Thread::Queue or a shared variable. And about passing references to scalars to the threads to keep track of the total number of items each thread processes. It's lovely in C but is it ok here?

Most importantly, is there a way to avoid duplicating that huge dictionary (without explicitly share()ing each and every entry of the array or hash.

I have another question, less important: I tried to use my own SIGINT handler to call stop() on Ctrl-C but it does not propagate all the way to the threads when threads start. I would say it is being completely overriden (by what?).

Here is the program as a test preamble and the package in one file, many thanks bliako:

#!/usr/bin/perl use strict; use warnings; select(STDERR); $| = 1; select(STDOUT); $| = 1; #use lib './'; #use Parler; my $par = undef; my $fh; open($fh, '<', '/usr/share/dict/words') || die "could not open diction +ary, $!\n"; my @dictionary = (); while( <$fh> ){ chomp; push @dictionary, $_ } close($fh); $par = Parler->new({ 'dictionary' => \@dictionary, }); my $results = $par->process({ 'num-threads' => 4, 'max-items' => 10, 'words' => ['pearl', 'jam'], }); die "error calling process()" if not defined($results); # This package does "lexical analysis"(mock) given a huge # dictionary. In parallel over a number of thread-workers. # Cost to pay: duplicates the read-only dictionary # over each thread. # OO style. # by bliako # 19/09/2018 package Parler; use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use Thread::Stack; sub new { my $class = $_[0]; my $params = $_[1] // {}; my $self = { # an array of words but can also be a hash - not neste +d 'dictionary' => $params->{'dictionary'}, 'dictionary-size' => scalar( @{$params->{'dictionary'} +} ), # the stop-flag is a ref to an integer which is set to + 1 # whenever stop() is called. 'stop-now-flag' => &share(\my $whatever), }; bless($self, $class); ${$self->{'stop-now-flag'}} = 0; # Here is the complex sub to find synonyms. # However, I am using just a mock for now which # spits out random words from the dictionary as arrayref $self->{'find-synonyms-sub'} = sub { my $aword = shift; return [ map { $self->{'dictionary'}->[ int(rand($self->{'dictionary-size'} +)) ] } (0..int(rand(3))) ] }; return $self } # a signal handler could just call this to abort... sub stop { ${$_[0]->{'stop-now-flag'}} = 1 } # spawns threads to do the processing. waits for the threads and retur +ns results # based on comments by BrowserUK and others on # L<How to create thread pool of ithreads|https://www.perlmonks.org/?n +ode_id=735923> sub process { my $self = $_[0]; my $params = $_[1] // {}; # required param my $inwords = $params->{'words'}; die "process() : 'words' needed." if ! defined($inwords); # optional input params: my $num_threads = $params->{'num-threads'} // 4; my $max_items_to_process : shared = $params->{'max-items'} // +20; my $work_Q = Thread::Queue->new(); my $failures_Q = Thread::Queue->new(); my $results_Q = Thread::Queue->new(); my $currently_working_on_Q = Thread::Queue->new(); my $num_items_processed_so_far : shared = 0; # spawn the workers my @thread_pool = (); for(1..$num_threads){ my $athread = threads->create( \&Parler::worker, # subref to worker $work_Q, $failures_Q, $results_Q, $currently_working_on_Q, \$num_items_processed_so_far, $max_items_to_process, $self->{'stop-now-flag'}, $self->{'find-synonyms-sub'}, time ); if( ! defined($athread) ){ print STDERR "process() : failed to create thr +ead.\n"; } else { push(@thread_pool, $athread); } } # now add all the specified input words and let it start # lock it and in a block so that the lock goes away when out { lock($work_Q); $work_Q->enqueue($_) for @$inwords; } # wait until all threads finish for(@thread_pool){ $_->join; print "process() : thread terminated and joined.\n"; } my $astr; # did we have failures? while( defined($astr=$failures_Q->dequeue_nb()) ){ print "process() : got failure : $astr\n"; } # any items still pending? while( defined($astr=$currently_working_on_Q->dequeue_nb()) ){ print "process() : words in unfinished items queue: $a +str\n"; } my @results; print "process() : done. Here are the results:\n"; while( defined($astr=$results_Q->dequeue_nb()) ){ my ($word, @synons) = split(/\t/, $astr); print " $word : ".join(", ", @synons)."\n"; push(@results, [$word, \@synons]); } print "process() : items found $num_items_processed_so_far (ma +x was $max_items_to_process).\n"; return \@results } sub worker { my ( $WQ, # work queue $FQ, # failures queue $RQ, # send-results-back queue $CWQ, # items-currently-working-on queue $num_items_done_ref, # how many items all workers have + completed $max_items_to_do, # how many items we need to shutdown $stop_now_ref, # a flag to stop if set to 1 $synonyms_finder_sub_ref, # a coderef to call to find +the synonyms $time_started, ) = @_; my $tid = threads->tid(); print "thread $tid : started in ".(time-$time_started)."s. syn +onyms_finder_sub_ref=$synonyms_finder_sub_ref\n"; my $time_started2 = time; my ($word, $astr, $i); MAINLOOP: while( $$num_items_done_ref <= $max_items_to_do and $$stop_now_ref == 0 ){ print "thread $tid : looping and stop is ".$$stop_now_ +ref."\n"; # wait for work to appear for up the specified # time (do not set to zero or use a sleep) # or go back to check the loop conditions # our job is to get the next word from the Q # and find its synonyms next unless defined($word=$WQ->dequeue_timed(1)); print "thread $tid : got a word : '$word'\n"; # is anyone else working on this word? { lock($CWQ); # lock it first for($i=$CWQ->pending();$i-->0;){ next MAINLOOP if $CWQ->peek($i) =~ /^$ +word\t/ } } print "thread $tid : nobody works on this word: $word\ +n"; # word has been explored, don't do it again { lock($RQ); # lock it first for($i=$RQ->pending();$i-->0;){ next MAINLOOP if $RQ->peek($i) =~ /^$w +ord\t/ } } print "thread $tid : word has not been explored: $word +\n"; # fine we proceed: print "thread $tid : working on word $word\n"; # 1. register we are doing this word { lock($CWQ); $CWQ->enqueue($word."\t".$tid); } # 2. process it calling the sub-ref and possibly other +s # results is an array ref of synonyms my $synons = $synonyms_finder_sub_ref->($word); if( defined($synons) ){ print "thread $tid : got synonyms for '$word' +: ".join(", ", @$synons)."\n"; # word has been processed ok # 1. insert it to the results { lock($RQ); $RQ->enqueue(join("\t", $word, @$synon +s)); } # 2. insert all synonyms found into the work q +ueue # so all threads to explore them { lock($WQ); $WQ->enqueue($_) for @$synons; } # 3. update the total number of items complete +d (will shutdown when max) { lock($num_items_done_ref); $$num_items_done_ref = $$num_items_don +e_ref + scalar(@$synons); } } else { # oops error print STDERR "thread $tid : call to synonyms_f +inder_sub_ref() has failed for word : $word. Skipping this word ...\n +"; { lock($FQ); $FQ->enqueue("thread $tid : failed for + word '$word'"); } } # 4. remove it from the currently-working hash (even i +f failed) { lock($CWQ); for($i=$CWQ->pending();$i-->0;){ $CWQ->extract($i) if $CWQ->peek($i) =~ + /^$word\t/ } } } print "thread $tid : shut down after ".(time-$time_started2)." +s.\n"; } 1;