Beefy Boxes and Bandwidth Generously Provided by pair Networks
There's more than one way to do things

A basic 'worker' threading example.

by Preceptor (Deacon)
on Dec 29, 2013 at 12:15 UTC ( #1068673=perlmeditation: print w/replies, xml ) Need Help??

This post is aimed at the people who have heard of Perl threading, and think it intriguing - but haven't really gotten to grips with how it's done. I'm going to put together a ... template, if you like, for a very basic style of threading.

Parallel code is somewhat hazardous for the unwary - because you're 'splitting' your program, and making different parts run at different speeds, you can end up with some incredibly frustrating and hard to track bugs. Every thread is a race condition waiting to happen. So all the bad habits you've picked up when coding in Perl, may well come and bite you if you 'thread it'.

The simplest thing to thread is what's known as an 'Embarassingly Parallel' problem. It's a type of problem where there are multiple tasks, but no dependencies, communication or synchronisation needed.

When 'doing' parallel code, you start to think in terms of scalability and efficiency - every thread start has an overhead. So does every communication between threads. However the most 'expensive' task is synchronising all your threads - they all have to wait until the slowest thread 'catches up'.

Thankfully - an 'embarassingly parallel' problem has none of these things.

An example I might use is pinging 1000 servers. You want to ping each of them, but you don't need to do so in any particular order. However, if a server is offline, then a 'ping' will wait for a timeout, making the process a lot slower.

The only thing you have to worry about is if you ping them 'all at once' you might end up sending a lot of data across the network.

This is a near perfect example of a type of problem I encounter regularly, and so I give it as example code.

Perl actually has quite a good way of 'spotting' embarassingly parallel stuff - the 'foreach' loop is often a good sign.

If you're doing the same thing on every item in a list, then there's a good chance that they might be suitable for parallelisation. You may not gain a large advantage from doing it though - the real advantage of threading is in making use of multiple system resources - processors, network sockets, etc. It's not the only way of achieving that result though, and it will - as a result - 'hog' more of a system's resource when it runs. (but hopefully for less time)

To break down the task:

  • We create some 'worker' threads, that do a very simple 'run a command' operation (in this case, ping).
  • We define a list of servers (read from a file), and use the Thread::Queue module to handle queuing.
  • We wait for thread completion, and collate errors.

Which looks a bit like this:

#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; my $nthreads = 5; my $process_q = Thread::Queue -> new(); my $failed_q = Thread::Queue -> new(); #this is a subroutine, but that runs 'as a thread'. #when it starts, it inherits the program state 'as is'. E.g. #the variable declarations above all apply - but changes to #values within the program are 'thread local' unless the #variable is defined as 'shared'. #Behind the scenes - Thread::Queue are 'shared' arrays. sub worker { #NB - this will sit a loop indefinitely, until you close the queue. #using $process_q -> end #we do this once we've queued all the things we want to process #and the sub completes and exits neatly. #however if you _don't_ end it, this will sit waiting forever. while ( my $server = $process_q -> dequeue() ) { chomp ( $server ); print threads -> self() -> tid(). ": pinging $server\n"; my $result = `/bin/ping -c 1 $server`; if ( $? ) { $failed_q -> enqueue ( $server ) } print $result; } } #insert tasks into thread queue. open ( my $input_fh, "<", "server_list" ) or die $!; $process_q -> enqueue ( <$input_fh> ); close ( $input_fh ); #we 'end' process_q - when we do, no more items may be inserted, #and 'dequeue' returns 'undefined' when the queue is emptied. #this means our worker threads (in their 'while' loop) will then exit. $process_q -> end(); #start some threads for ( 1..$nthreads ) { threads -> create ( \&worker ); } #Wait for threads to all finish processing. foreach my $thr ( threads -> list() ) { $thr -> join(); } #collate results. ('synchronise' operation) while ( my $server = $failed_q -> dequeue_nb() ) { print "$server failed to ping\n"; }

Now, this _is_ a very simple model of a 'threaded' task - and it will only suit situations where there are no dependencies on the results.

Replies are listed 'Best First'.
Re: A basic 'worker' threading example.
by Random_Walk (Prior) on Dec 29, 2013 at 13:46 UTC

    Nice post. I never used q->end() before, but enqueued an undef for each thread I had spawned. From now on I will go with the end method.

    Normally I start the threads before I start putting work on the queue. In this case it won't make much difference unless the list of hosts to ping is very large, or the file read is the other side of a very slow network link. But sometimes the process sourcing the list of work is comparatively slow, so letting the threads get to work sooner can help


    Pereant, qui ante nos nostra dixerunt!

      q -> end is something that a newer version of the Thread::Queue library supports - it's why I didn't use it initially either. And worked around it by some sort of fiddle with pending, semaphores or queuing a load of 'exit' codes. As an alternative, I might offer 'thread->kill'. If you override 'SIGTERM' (which is normally an 'unmaskable' interrupt) you can send 'SIGTERM' to your thread to force it to exit.) E.g.:

      #add to thread: $SIG{'TERM'} = sub { threads->exit(); }; ## just before the 'join' add: $thr -> kill ( "SIGTERM" );

      This isn't a 'real' SIGTERM as PERL can't trap those - but it's a good way of getting a thread to preemptively terminate. (Although, I'd note that _because_ it's not a 'proper' SIGTERM, it also might be delayed until the current function call has finished.

      I would normally start the threads first, but dislike doing it if it might 'die' on that file open. There are neater ways of doing it, but I thought to err on the side of brevity.

Re: A basic 'worker' threading example.
by zentara (Archbishop) on Dec 30, 2013 at 12:20 UTC
    It may be that my mind has been warped by trying to get threads to work well with event loop systems, but this way, from quite long ago, is a way my mind devised to get reusable threads, ie. a pool of sleeping worker threads. I hope you find it fascinating. :-) BTW, yes I know you can avoid the GOTO's and LABELS with proper loop construction, but I like the old dog trick of using goto. :-)
    #!/usr/bin/perl use strict; use warnings; use threads; use threads::shared; my %shash; my @to_be_processed = ('a'..'z'); my @ready:shared = (); my $numworkers = 10; foreach my $dthread(1..$numworkers){ share $shash{$dthread}{'go'}; $shash{$dthread}{'go'} = 0; share $shash{$dthread}{'fileno'}; #in case you want $shash{$dthread}{'fileno'} = ''; #shared filehandles share $shash{$dthread}{'data'}; $shash{$dthread}{'data'} = ''; share $shash{$dthread}{'pid'}; $shash{$dthread}{'pid'} = -1; share $shash{$dthread}{'die'}; $shash{$dthread}{'die'} = 0; $shash{$dthread}{'thread'} = threads->new(\&worker, $dthread); push @ready, $dthread; } print "\t\t",scalar @ready," threads a key to start +threads\n"; <>; while (my $t = shift(@ready)){ $shash{$t}{'data'} = shift @to_be_processed; $shash{$t}{'go'} = 1; } while(1){ if( scalar @ready > 0 ){ if( my $data = shift @to_be_processed ){ my $t = shift(@ready); $shash{$t}{'data'} = $data; $shash{$t}{'go'} = 1; print "thread $t restarting\n"; }else{ print "out of input\n"; goto WAIT; } } } WAIT: print "\n\n\nWAITING FOR FINISH\n\n\n"; while(1){ print "\n\n\n",scalar @ready," are ready\n\n\n"; if ( scalar @ready < $numworkers ){ sleep 1}else{ foreach my $t(@ready){ $shash{$t}{'die'} = 1; $shash{$t}{'thread'}->join; print "joining thread $t\n"; } exit; } } sub worker{ my $thr_num = shift; print "$thr_num started\n"; my $count; START: while(1){ if( $shash{$thr_num}{'die'} ){ print "thread $thr_num finishing\n"; return} #wait for $go_control if($shash{$thr_num}{'go'}){ if($shash{$thr_num}{'die'}){ print "thread finishing\n"; return} $count++; my $str = ' 'x$thr_num; #printout spacer print $str.$thr_num.'->'.$count.$shash{$thr_num}{'data' +},"\n"; if ($count > 10){ goto RECYCLE; } #select(undef,undef,undef,.25); sleep rand 5 }else{ $count = 0; select(undef,undef,undef,.25); }# sleep until awakened } #end while(1) RECYCLE: $shash{$thr_num}{'go'} = 0; print "$thr_num done....going back to sleep\n"; $shash{$thr_num}{'data'} = ''; $count = 0; push @ready, $thr_num; print "pushing $thr_num\n"; goto START; return; }

    I'm not really a human, but I play one on earth.
    Old Perl Programmer Haiku ................... flash japh
Re: A basic 'worker' threading example.
by Anonymous Monk on Jan 22, 2019 at 17:19 UTC

    Thanks Preceptor for a great 'template' - great for a person like me who are making my first stumbles in the world of multi-threading in perl! I have been trying to use the template and then instead of pinging, do a numeric operation and return a number. However, I cannot for my life extract the return value. In the example below, I have tried to extract the return value in a variable $sum, which is overwritten at each join... However, I get the following:

    Use of uninitialized value $sum in concatenation (.) or string at test line 26. sum= Use of uninitialized value $sum in concatenation (.) or string at test line 26. sum=


    use strict; use warnings; use threads; use Thread::Queue; my $process_q = Thread::Queue -> new(1,2); sub worker { while ( $process_q -> dequeue() ) { my $retVar=threads -> self() -> tid(); return $retVar; } } $process_q -> end(); #start some threads for ( 1..2 ) { threads -> create ( \&worker ); } #Wait for threads to all finish processing. foreach my $thr ( threads -> list() ) { my $sum=$thr -> join; print "sum=$sum\n"; }

    2019-01-23 Athanasius added code and paragraph tags

      Anonymous Monk:

      Your script is actually very close to working. The only problem is that you didn't properly set the context for the thread. (Read perldoc threads and look for the THREAD CONTEXT section. Since you didn't capture the return value of the create() call, you set up the context of the thread as VOID context, meaning that you can't fetch the return code, and that's why you're getting an undefined value later when you call join().

      So for you to fix your code, you need only set the appropriate context for the thread. Since you want a scalar result, you need only capture the result of the create() call into a scalar. That way, when you actually call the join() method, it will fetch the scalar result for you. You don't need to keep the scalar, so the simple fix for your program is to change the loop that creates your worker threads from:

      for ( 1..2 ) { threads -> create ( \&worker ); }

      to this:

      for ( 1..2 ) { my $temp_scalar = threads -> create ( \&worker ); }

      Afterwards, when you run your script, you'll get the result:

      $ perl sum=1 sum=2

      Here, I used the 'implicit' method of setting the context. However, if you start building larger applications with different types of worker threads, you might find the explicit method of setting the context to be better. Just be sure to review the entire threads documentation and look for unexpected things. You'll also want to look over the examples and/or test code for the threads, threads::shared and Thread::Queue modules to see how their designers expect them to be used.

      Please learn how to properly quote code in a node so that people can read it more easily. I've reformatted it, made the fix I mentioned earlier, and quoted it here for other readers:

      use strict; use warnings; use threads; use Thread::Queue; my $process_q = Thread::Queue -> new(1,2); sub worker { while ( $process_q -> dequeue() ) { my $retVar=threads -> self() -> tid(); return $retVar; } } $process_q -> end(); #start some threads for ( 1..2 ) { my $tmp = threads -> create ( \&worker ); } #Wait for threads to all finish processing. foreach my $thr ( threads -> list() ) { my $sum=$thr -> join; print "sum=$sum\n"; }

      EDIT: Looks like the janitors cleaned up the formatting in the parent node before I replied.


      When your only tool is a hammer, all problems look like your thumb.

        Many thanks Roboticus for giving me a guide (and even showing how to do it), my test code is working so now I will head off to bigger and better applications! Cheers, R

      Why did you end queue?
Re: A basic 'worker' threading example.
by sundialsvc4 (Abbot) on Dec 31, 2013 at 14:39 UTC

    It's also worth noting, in addition to this most-excellent post, that there are a lot of Perl workflow/thread/process management packages already out there ... POE, Parallel::ForkManager, and so on and on and on.   This type of threading is a FIP = Frequently Implemented Program, so look around in CPAN for "prior art" that you can readily use in your project.

    Yes, you can always build a door from scratch.   But you can also buy a pre-hung door or even a complete wall with doors, windows and electrical wiring, or a mobile home.   Therefore, “choose wisely, because the choice is yours to make.”   No matter exactly what-it-is that you are doing, you usually always have the option to start from somewhere other than “from scratch.”   (Or not.)   The set of options that are available to you in CPAN are very large indeed, so “build vs. borrow” should be an engineering decision that you consider and make from the earliest possible point.

    Actum Ne Agas:   Do Not Do A Thing Already Done.

Log In?

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlmeditation [id://1068673]
Approved by Old_Gray_Bear
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others avoiding work at the Monastery: (4)
As of 2022-12-05 09:00 GMT
Find Nodes?
    Voting Booth?

    No recent polls found