Beefy Boxes and Bandwidth Generously Provided by pair Networks
Syntactic Confectionery Delight
 
PerlMonks  

Receiving data asynchronously in a long-run process

by Annirak (Novice)
on Sep 21, 2009 at 17:28 UTC ( #796581=perlquestion: print w/replies, xml ) Need Help??

Annirak has asked for the wisdom of the Perl Monks concerning the following question:

I'm trying to build a worker process that accepts and processes jobs. I'm getting hung up on how to accept the jobs: I'd like the worker to be able to accept jobs without having to check for them periodically. The way I have implemented it currently is this:

  1. The job issuer spawns a worker, and connects to it via a pipe. my $pid=open($pipe,"| $worker");
  2. The job issuer freezes a job object (or hash or whatever)
  3. The job issuer calculates the length of the frozen object, then sends print $pipe "$len\n$job"
  4. The worker picks this job up by: $len=<STDIN>;chomp $len; read(STDIN,$data,$len);$ref=thaw($data);
  5. The worker should enqueue the job for processing, then continue processing its current job.
  6. goto 2.

Here's the catch: For the second or later iteration, the worker may be in the middle of a long run process, so there's no way to know how long the worker might take to service STDIN. While I could just let data sit in the pipe, I'm not sure how reliable that is. More to the point, it makes it harder (impossible?) for the worker to collect information about pending jobs.

So I got the bright idea of signaling the worker when data has been written to its input pipe. Here's the worker:

#!/usr/bin/perl use strict; use Storable qw(freeze thaw); BEGIN{ $SIG{USR1}=\&drdy; } my @datastore=(); sub drdy { print "Signal!\n"; my $d=<STDIN>; print $d."\n"; push @datastore,$d; $SIG{USR1}=\&drdy; } while (1) { sleep 1; print shift @datastore if (scalar @datastore); }

This worker gets stuck in &drdy after the first SIGUSR1. This leads me to believe that I can't read from STDIN in a signal handler. The job issuer sets {my $fh = select $pipe;$|=0;select $fh} so the problem is not that the communication is buffered. The entire transaction should be in the worker's input pipe before the job issuer signals the worker.

I can only think of one other options to handle this IPC problem; make the worker multithreaded with one pipe management thread and one work thread. This seems like a pretty heavyweight solution to a pretty simple problem.

Is multithreading the way to do this?

Replies are listed 'Best First'.
Re: Receiving data asynchronously in a long-run process
by Joost (Canon) on Sep 21, 2009 at 18:15 UTC
    The worker should enqueue the job for processing, then continue processing its current job.

    Unless your worker is actually capable of doing more than one job at a time (by, for instance, forking off other processes to do that) there is no reason to fetch jobs ASAP. In the simpler case where jobs are processed sequentially you generally only want to get a new job once the old one it finished. But that will possibly block the issuer if the pipe fills up.

    Also, unless you're talking about really simple systems, you generally want to have multiple workers (and possibly multiple issuers) running at the same time, possibly at different machines. There are generic systems for doing the message queueing in that situation, they're called message queues.

    The message queue will recieve jobs (messages) and set up routing and queues for multiple receivers as you need. That simplifies the workers to a more or less pure "fetch, process, repeat" loop. Take a look at activemq or rabbitmq for example. (Note: activemq takes some work to configure, but supports STOMP as the protocol which is really straightforward. To understand rabbitmq you need to understand the AMQP specs and use POE - and neither are completely trivial**)

    I can only think of one other options to handle this IPC problem; make the worker multithreaded with one pipe management thread and one work thread. This seems like a pretty heavyweight solution to a pretty simple problem.
    POE works well for these situations, probably better than threads, though I wouldn't describe it as lightweight.

    Also, unless you're comfortable with dropping jobs when crashes occur and running everything on a single machine or thread, your problem isn't simple. :)

    edit: **) for now, you may also want my forks of the AMQP modules, here and here (you need both).

      Wow, joost, that's a lot of really great information. My intent was originally to write a library which would work much like Thread::Pool::Simple, so you're absolutely right; I was planning to use multiple workers. I had intended this to be a local-machine-only library, but the possibility of handling off-machine jobs for particularly granular tasks has a lot to recommend it too.

      For a local-machine library, I suspect that activemq is probably heavier than necessary. For a grid computing library, it's probably a great solution. I don't think I need quite that much power, but since I do want to release this library, perhaps the flexibilty is worthwhile.

Re: Receiving data asynchronously in a long-run process
by rcaputo (Chaplain) on Sep 21, 2009 at 17:45 UTC

    If you use a named pipe and file locking to synchronize its access, then the writer (or writers) will block until a reader is ready. In your case, if no workers are ready to read from the named pipe, then the job issuer will block.

    It's important to consider that workers may need to poll, even if you'd prefer them not to. If no workers are ready, then the job issuer will need to save the job somewhere reliable. That's generally a database or spool directory. In that case, workers must poll for new jobs when the become ready again.

      After thinking about it some more, I think the way to do what I've proposed is to implement some locking between the worker and the issuer. Doing this requires a manager in the middle somewhere. The manager would need to hold the jobs in a queue and wait for the workers to request jobs/indicate readiness to receive jobs.

      I'm also not sure that my approach is the best. It might be better to provide for the option of spreading my workers across multiple machines. Naturally, that will mean implementing TCP connections instead of pipes. It will likely also mean a more complex startup process, and much more complex management.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://796581]
Approved by Herkum
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others drinking their drinks and smoking their pipes about the Monastery: (4)
As of 2022-06-28 07:06 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    My most frequent journeys are powered by:









    Results (90 votes). Check out past polls.

    Notices?