Beefy Boxes and Bandwidth Generously Provided by pair Networks
Do you know where your variables are?
 
PerlMonks  

POE::Wheel::ReadWrite: "low priority" events

by vlad_s (Novice)
on Jan 20, 2012 at 16:17 UTC ( [id://948985]=perlquestion: print w/replies, xml ) Need Help??

vlad_s has asked for the wisdom of the Perl Monks concerning the following question:

Hi, I'm writing a POE-based application that should do the following:

  • Read a line from a file
  • process the line; whilst processing a bunch of other events may be emitted and processed accordingly
  • once all generated events are processed, read next line etc.

In other words I'd like to ensure that my generated events make their way into the event queue before the "next line available" event arrives from ReadWrite. This doesn't seem to be the case if I use post() or yield() for generated events.

One solution that seems possible would be using POE::Kernel::call() rather than post(). In that case the InputEvent handler won't return until all processing is done. However, for certain reasons this approach doesn't work for me and I'd like to use post().

To illustrate the isssue, here's a simple piece of code:

use POE qw(Wheel::ReadWrite); my $S2 = POE::Session->create( inline_states => { _start => sub { $_[HEAP]->{reader} = POE::Wheel::ReadWrite->new( Handle => \*STDIN, InputEvent => 'got_input', ); }, got_input => \&got_input, bang => \&bang, } ); sub got_input { print $_[ARG0], "\n"; $poe_kernel->yield('bang', $_[ARG0]); } sub bang { print "Bang\n"; } $poe_kernel->run();

If I feed a 5-line file to this script, the following gets printed:

This is line number 1 This is line number 2 This is line number 3 This is line number 4 This is line number 5 Bang Bang Bang Bang Bang

Whereas what I want is:

This is line number 1 Bang This is line number 2 Bang This is line number 3 Bang This is line number 4 Bang This is line number 5 Bang

Anybody aware of an easy way of doing it?

Replies are listed 'Best First'.
Re: POE::Wheel::ReadWrite: "low priority" events
by rcaputo (Chaplain) on Jan 20, 2012 at 17:33 UTC

    Here's the long way, which doesn't assume you know the minimum length of input records. I've added copious comments to make this more tutorial-like.

    use warnings; use strict; use POE qw(Wheel::ReadWrite); my $S2 = POE::Session->create( inline_states => { _start => sub { $_[HEAP]->{input_buffer} = [ ]; $_[HEAP]->{reader} = POE::Wheel::ReadWrite->new( Handle => \*STDIN, InputEvent => 'got_input', ErrorEvent => 'got_input_error', ); }, got_input => \&handle_input, got_input_error => \&handle_input_error, step_one => \&do_step_one, step_two => \&do_step_two, process_next_input => \&process_next_input, } ); POE::Kernel->run(); exit; sub handle_input { # Buffer the input rather than process it immediately. my $buffered_count = push @{ $_[HEAP]{input_buffer} }, $_[ARG0]; # If this is the first input record in the buffer, then begin # processing input. We don't need to begin every time, since the # rest of the code will continue to process input as long as there's # something in the buffer. # However, we do want to pause the reader's input, since otherwise # it might slurp an entire file or stream into the input buffer # while we're doing something else. # Note that we can't just pause_input() if we want to handle one # input record at a time. For efficiency, POE::Wheel::ReadWrite may # have read multiple input records at once. if ($buffered_count == 1) { $_[HEAP]{reader}->pause_input(); $poe_kernel->yield('process_next_input'); } } # Stop the reader on input error, which may simply be EOF. # The program exits shortly afterwards. # It may be useful to print the error later on. sub handle_input_error { delete $_[HEAP]{reader}; } sub process_next_input { # Get the next job from the input buffer. my $next_input = shift @{ $_[HEAP]{input_buffer} }; # If there was no next input, then resume input so more jobs can be # acquired from STDIN. unless (defined $next_input) { $_[HEAP]->{reader}->resume_input(); return; } # Simulate multi-event asynchronous work. print "Next input: $next_input\n"; $_[KERNEL]->yield('step_one', $next_input); } sub do_step_one { print " step one: $_[ARG0]\n"; $_[KERNEL]->yield('step_two', $_[ARG0]); } sub do_step_two { print " step two: $_[ARG0]\n"; $_[KERNEL]->yield('process_next_input'); }

      Thank you very much Rocco, the idea is clear.

      However this assumes that we know which event is issued last - because that event must issue 'process_next_input'.

      What if we don't know that - the number and order of events that get generated in the course of processing of a line is more or less random?

      I changed your sample so that event step_two sometimes gets issued and sometimes doesn't. Also I added checking get_event_count() to see if there are any further events in the queue. Seems that achieved my goal:

      sub do_step_one { print " step one: $_[ARG0]\n"; $_[KERNEL]->yield('step_two', $_[ARG0]) if rand(1) > 0.5; $_[KERNEL]->yield('process_next_input') if $poe_kernel->get_event_co +unt() == 0; } sub do_step_two { print " step two: $_[ARG0]\n"; $_[KERNEL]->yield('process_next_input') if $poe_kernel->get_event_co +unt() == 0; }

      Thanks once again for your time!

        You could implement the Chain of Responsibility design pattern by promoting ARG0 from an input string to a data structure that includes the input and the steps to process it.

        First define a subroutine to pass the job to the next step in the recipe:

        sub what_next { my ($kernel, $job) = @_; my $next_step = shift @{ $job->{recipe} }; if (defined $next_step) { $kernel->yield( $next_step => $job ); } else { $kernel->yield( 'process_next_input' ); } }

        process_next_input() builds the job and the initial recipe, and then uses what_next() to start the process:

        sub process_next_input { ...; my $job = { input => $next_input, recipe => [ 'step_one', 'step_two' ], }; what_next($_[KERNEL], $job); }

        All stages of the process end by calling what_next() to pass the work on. You could get fancy by pushing coderefs onto the recipe that conditionally return the next event in the chain.

        Sorry for not responding in so long. I'm doing PerlMonks wrong, and I don't notice my message inbox until I actively return to the site. If there's some sort of RSS-like way to check for messages, that would help a lot.

Re: POE::Wheel::ReadWrite: "low priority" events
by rcaputo (Chaplain) on Jan 20, 2012 at 17:47 UTC

    Here's the shorter, less efficient method, which assumes you do know the minimum length of input records. It tunes the sysread() length down to the minimum record length to guarantee no more than one input record is read at a time.

    Of course you could be ultra-conservative and tune sysread() down to one octet at a time. This would work in all cases, but the general rule is that You're Doing It Wrong™ if you ask Perl to process strings one octet at a time.

    This example assumes lines of input are at least 10 octets each. It will fail on shorter input.

    use warnings; use strict; use POE qw(Wheel::ReadWrite Driver::SysRW); use constant MIN_RECORD_LENGTH => 10; my $S2 = POE::Session->create( inline_states => { _start => sub { $_[HEAP]->{input_buffer} = [ ]; $_[HEAP]->{reader} = POE::Wheel::ReadWrite->new( # Reduce the sysread size to the minimum record length. This # is less efficient than slurping larger chunks of input, but # it prevents more than one record from being read at a time. Driver => POE::Driver::SysRW->new( BlockSize => MIN_RECORD_LENGTH, ), Handle => \*STDIN, InputEvent => 'got_input', ErrorEvent => 'got_input_error', ); }, got_input => \&handle_input, got_input_error => \&handle_input_error, step_one => \&do_step_one, step_two => \&do_step_two, } ); POE::Kernel->run(); exit; sub handle_input { # POE::Wheel::ReadWrite's input buffer doesn't contain any more # complete input records, so we can simply pause input here. $_[HEAP]{reader}->pause_input(); # And begin processing the input secure in the knowledge that more # records won't arrive until we're done. print "Next input: $_[ARG0]\n"; $_[KERNEL]->yield('step_one', $_[ARG0]); } # Stop the reader on input error, which may simply be EOF. # The program exits shortly afterwards. # It may be useful to print the error later on. sub handle_input_error { delete $_[HEAP]{reader}; } sub do_step_one { print " step one: $_[ARG0]\n"; $_[KERNEL]->yield('step_two', $_[ARG0]); } sub do_step_two { print " step two: $_[ARG0]\n"; # Resume input after we're done. $_[HEAP]{reader}->resume_input(); }

      I also tried your second version, but if you set BlockSize less than the file size, for some weird reason this session isn't able to post events to other sessions, returning "No such process".

      In the debugger it fails on the following line:

      POE::Kernel::_data_ses_resolve(/usr/local/share/perl5/POE/Resource/Ses +sions.pm:361): 361: return undef unless exists $kr_session_refs{$session}; # Pre +vents autoviv.

      Not sure if that's ok, but nevertheless I'll go for your first version. Thanks a lot!

        The "No such process" error was due to the recipient session being stopped by POE as it didn't have any resources to monitor. Setting an alias on that session cured the problem.

        As far as I can see both methods proposed by Rocco work quite well.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://948985]
Approved by marto
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others sharing their wisdom with the Monastery: (4)
As of 2024-04-24 20:37 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found