Beefy Boxes and Bandwidth Generously Provided by pair Networks
Clear questions and runnable code
get the best and fastest answer
 
PerlMonks  

A basic 'worker' threading example.

by Preceptor (Deacon)
on Dec 29, 2013 at 12:15 UTC ( #1068673=perlmeditation: print w/replies, xml ) Need Help??

This post is aimed at the people who have heard of Perl threading, and think it intriguing - but haven't really gotten to grips with how it's done. I'm going to put together a ... template, if you like, for a very basic style of threading.

Parallel code is somewhat hazardous for the unwary - because you're 'splitting' your program, and making different parts run at different speeds, you can end up with some incredibly frustrating and hard to track bugs. Every thread is a race condition waiting to happen. So all the bad habits you've picked up when coding in Perl, may well come and bite you if you 'thread it'.

The simplest thing to thread is what's known as an 'Embarassingly Parallel' problem. It's a type of problem where there are multiple tasks, but no dependencies, communication or synchronisation needed.

When 'doing' parallel code, you start to think in terms of scalability and efficiency - every thread start has an overhead. So does every communication between threads. However the most 'expensive' task is synchronising all your threads - they all have to wait until the slowest thread 'catches up'.

Thankfully - an 'embarassingly parallel' problem has none of these things.

An example I might use is pinging 1000 servers. You want to ping each of them, but you don't need to do so in any particular order. However, if a server is offline, then a 'ping' will wait for a timeout, making the process a lot slower.

The only thing you have to worry about is if you ping them 'all at once' you might end up sending a lot of data across the network.

This is a near perfect example of a type of problem I encounter regularly, and so I give it as example code.

Perl actually has quite a good way of 'spotting' embarassingly parallel stuff - the 'foreach' loop is often a good sign.

If you're doing the same thing on every item in a list, then there's a good chance that they might be suitable for parallelisation. You may not gain a large advantage from doing it though - the real advantage of threading is in making use of multiple system resources - processors, network sockets, etc. It's not the only way of achieving that result though, and it will - as a result - 'hog' more of a system's resource when it runs. (but hopefully for less time)

To break down the task:

  • We create some 'worker' threads, that do a very simple 'run a command' operation (in this case, ping).
  • We define a list of servers (read from a file), and use the Thread::Queue module to handle queuing.
  • We wait for thread completion, and collate errors.

Which looks a bit like this:

#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; my $nthreads = 5; my $process_q = Thread::Queue -> new(); my $failed_q = Thread::Queue -> new(); #this is a subroutine, but that runs 'as a thread'. #when it starts, it inherits the program state 'as is'. E.g. #the variable declarations above all apply - but changes to #values within the program are 'thread local' unless the #variable is defined as 'shared'. #Behind the scenes - Thread::Queue are 'shared' arrays. sub worker { #NB - this will sit a loop indefinitely, until you close the queue. #using $process_q -> end #we do this once we've queued all the things we want to process #and the sub completes and exits neatly. #however if you _don't_ end it, this will sit waiting forever. while ( my $server = $process_q -> dequeue() ) { chomp ( $server ); print threads -> self() -> tid(). ": pinging $server\n"; my $result = `/bin/ping -c 1 $server`; if ( $? ) { $failed_q -> enqueue ( $server ) } print $result; } } #insert tasks into thread queue. open ( my $input_fh, "<", "server_list" ) or die $!; $process_q -> enqueue ( <$input_fh> ); close ( $input_fh ); #we 'end' process_q - when we do, no more items may be inserted, #and 'dequeue' returns 'undefined' when the queue is emptied. #this means our worker threads (in their 'while' loop) will then exit. $process_q -> end(); #start some threads for ( 1..$nthreads ) { threads -> create ( \&worker ); } #Wait for threads to all finish processing. foreach my $thr ( threads -> list() ) { $thr -> join(); } #collate results. ('synchronise' operation) while ( my $server = $failed_q -> dequeue_nb() ) { print "$server failed to ping\n"; }

Now, this _is_ a very simple model of a 'threaded' task - and it will only suit situations where there are no dependencies on the results.

Replies are listed 'Best First'.
Re: A basic 'worker' threading example.
by Random_Walk (Prior) on Dec 29, 2013 at 13:46 UTC

    Nice post. I never used q->end() before, but enqueued an undef for each thread I had spawned. From now on I will go with the end method.

    Normally I start the threads before I start putting work on the queue. In this case it won't make much difference unless the list of hosts to ping is very large, or the file read is the other side of a very slow network link. But sometimes the process sourcing the list of work is comparatively slow, so letting the threads get to work sooner can help

    Cheers,
    R.

    Pereant, qui ante nos nostra dixerunt!

      q -> end is something that a newer version of the Thread::Queue library supports - it's why I didn't use it initially either. And worked around it by some sort of fiddle with pending, semaphores or queuing a load of 'exit' codes. As an alternative, I might offer 'thread->kill'. If you override 'SIGTERM' (which is normally an 'unmaskable' interrupt) you can send 'SIGTERM' to your thread to force it to exit.) E.g.:

      #add to thread: $SIG{'TERM'} = sub { threads->exit(); }; ## just before the 'join' add: $thr -> kill ( "SIGTERM" );

      This isn't a 'real' SIGTERM as PERL can't trap those - but it's a good way of getting a thread to preemptively terminate. (Although, I'd note that _because_ it's not a 'proper' SIGTERM, it also might be delayed until the current function call has finished.

      I would normally start the threads first, but dislike doing it if it might 'die' on that file open. There are neater ways of doing it, but I thought to err on the side of brevity.

Re: A basic 'worker' threading example.
by zentara (Archbishop) on Dec 30, 2013 at 12:20 UTC
    It may be that my mind has been warped by trying to get threads to work well with event loop systems, but this way, from quite long ago, is a way my mind devised to get reusable threads, ie. a pool of sleeping worker threads. I hope you find it fascinating. :-) BTW, yes I know you can avoid the GOTO's and LABELS with proper loop construction, but I like the old dog trick of using goto. :-)
    #!/usr/bin/perl use strict; use warnings; use threads; use threads::shared; my %shash; my @to_be_processed = ('a'..'z'); my @ready:shared = (); my $numworkers = 10; foreach my $dthread(1..$numworkers){ share $shash{$dthread}{'go'}; $shash{$dthread}{'go'} = 0; share $shash{$dthread}{'fileno'}; #in case you want $shash{$dthread}{'fileno'} = ''; #shared filehandles share $shash{$dthread}{'data'}; $shash{$dthread}{'data'} = ''; share $shash{$dthread}{'pid'}; $shash{$dthread}{'pid'} = -1; share $shash{$dthread}{'die'}; $shash{$dthread}{'die'} = 0; $shash{$dthread}{'thread'} = threads->new(\&worker, $dthread); push @ready, $dthread; } print "\t\t",scalar @ready," threads ready.......press a key to start +threads\n"; <>; while (my $t = shift(@ready)){ $shash{$t}{'data'} = shift @to_be_processed; $shash{$t}{'go'} = 1; } while(1){ if( scalar @ready > 0 ){ if( my $data = shift @to_be_processed ){ my $t = shift(@ready); $shash{$t}{'data'} = $data; $shash{$t}{'go'} = 1; print "thread $t restarting\n"; }else{ print "out of input\n"; goto WAIT; } } } WAIT: print "\n\n\nWAITING FOR FINISH\n\n\n"; while(1){ print "\n\n\n",scalar @ready," are ready\n\n\n"; if ( scalar @ready < $numworkers ){ sleep 1}else{ foreach my $t(@ready){ $shash{$t}{'die'} = 1; $shash{$t}{'thread'}->join; print "joining thread $t\n"; } exit; } } sub worker{ my $thr_num = shift; print "$thr_num started\n"; my $count; START: while(1){ if( $shash{$thr_num}{'die'} ){ print "thread $thr_num finishing\n"; return} #wait for $go_control if($shash{$thr_num}{'go'}){ if($shash{$thr_num}{'die'}){ print "thread finishing\n"; return} $count++; my $str = ' 'x$thr_num; #printout spacer print $str.$thr_num.'->'.$count.$shash{$thr_num}{'data' +},"\n"; if ($count > 10){ goto RECYCLE; } #select(undef,undef,undef,.25); sleep rand 5 }else{ $count = 0; select(undef,undef,undef,.25); }# sleep until awakened } #end while(1) RECYCLE: $shash{$thr_num}{'go'} = 0; print "$thr_num done....going back to sleep\n"; $shash{$thr_num}{'data'} = ''; $count = 0; push @ready, $thr_num; print "pushing $thr_num\n"; goto START; return; }

    I'm not really a human, but I play one on earth.
    Old Perl Programmer Haiku ................... flash japh
Re: A basic 'worker' threading example.
by sundialsvc4 (Abbot) on Dec 31, 2013 at 14:39 UTC

    It's also worth noting, in addition to this most-excellent post, that there are a lot of Perl workflow/thread/process management packages already out there ... POE, Parallel::ForkManager, and so on and on and on.   This type of threading is a FIP = Frequently Implemented Program, so look around in CPAN for "prior art" that you can readily use in your project.

    Yes, you can always build a door from scratch.   But you can also buy a pre-hung door or even a complete wall with doors, windows and electrical wiring, or a mobile home.   Therefore, “choose wisely, because the choice is yours to make.”   No matter exactly what-it-is that you are doing, you usually always have the option to start from somewhere other than “from scratch.”   (Or not.)   The set of options that are available to you in CPAN are very large indeed, so “build vs. borrow” should be an engineering decision that you consider and make from the earliest possible point.

    Actum Ne Agas:   Do Not Do A Thing Already Done.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlmeditation [id://1068673]
Approved by Old_Gray_Bear
help
Chatterbox?
and all is quiet...

How do I use this? | Other CB clients
Other Users?
Others exploiting the Monastery: (3)
As of 2017-11-25 04:34 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    In order to be able to say "I know Perl", you must have:













    Results (355 votes). Check out past polls.

    Notices?