Beefy Boxes and Bandwidth Generously Provided by pair Networks
XP is just a number
 
PerlMonks  

Event based handling of multiple connections with AnyEvent

by bennymack (Pilgrim)
on Oct 28, 2008 at 20:01 UTC ( #720090=perlquestion: print w/ replies, xml ) Need Help??
bennymack has asked for the wisdom of the Perl Monks concerning the following question:

Hello, Monks.

It's been a while but I've got a problem that I can't easily find the solution to for some reason. It involves event-based programming which I am just starting to wrap my head around.

I'm basically trying to write a simple test server that can allow multiple connections per process. It seems like this should be right up the alley of any of the asynchronous IO modules on CPAN. As in, it seems like I should be able to write a single process server that handles multiple long-lived requests by multiplexing the socket reads and writes.

Unfortunately I cannot get this to work. I really just need a recipe and then I can take it from there. I have an example that will respond to two persistent connections but anymore than that and the requests just are basically ignored until a process is cleared up. I probably should start with a single process example and go from there. Maybe I'm just confusing matters. It seems like it should be pretty simple.

Here's what I have so far. Please to be helping me out :)

#!/usr/bin/env perl use strict; use warnings; use Data::Dumper; use POSIX; use AnyEvent; use AnyEvent::Handle; use EV; use IO::Socket::INET(); my $socket = IO::Socket::INET->new( Listen => 5, ReuseAddr => 1, LocalPort => 8888, Proto => 'tcp' ); POSIX::setsid(); for my $child_num ( 1 .. 2 ) { my $pid = fork; if( $pid == 0 ) { warn $$, ' CHILD: started'; while( my $client = $socket->accept ) { my $cv = AnyEvent->condvar; $client->autoflush( 1 ); my( $peerhost, $peerport ) = ( $client->peerhost, $client- +>peerport ); my $handle = AnyEvent::Handle->new( fh => $client, on_drain => sub { warn $$, ' drain '; }, on_error => sub { print "Client connection error: $pee +rhost:$peerport: $!\n"; $cv->broadcast; }, ); my $read; $read = sub { my( $self, $line ) = @_; warn $$, ' got line ', $line; $self->push_read( line => $read ); }; $handle->push_read( line => $read,); $cv->recv; warn $$, ' $client done.'; } warn 'CHILD: done'; exit; } } while( ( my $pid = wait ) != -1 ) { warn 'REAPED: ', $pid; } warn 'PARENT: done'; exit;

Comment on Event based handling of multiple connections with AnyEvent
Download Code
Re: Event based handling of multiple connections with AnyEvent
by rcaputo (Chaplain) on Oct 29, 2008 at 00:29 UTC

    Your question seems confused. Do you want to handle multiple connections in a single process, or do you want to write a forking or pre-forking server like Apache's httpd? All three are varying levels of easy.

    Here's a single-process, event-driven way:

    #!/usr/bin/env perl use warnings; use strict; # Note: POE's default event loop uses select(). # See CPAN for more efficient POE::Loop classes. use POE; use POE::Component::Server::TCP; POE::Component::Server::TCP->new( Port => 8888, ClientConnected => sub { print "Client connected.\n"; }, ClientInput => sub { my ($app, $storage, $input) = @_[KERNEL, HEAP, ARG0]; print "Got client input: $input\n"; $storage->{client}->put($input); $app->yield("shutdown") if $input eq "quit"; }, ClientDisconnected => sub { print "Client disconnected.\n"; }, ); POE::Kernel->run();

    Here's another, adapted from Perl Cookbook recipe 17.11 (Forking Servers):

    #!/usr/bin/env perl use warnings; use strict; use POSIX qw(:sys_wait_h); use IO::Socket::INET(); use IO::Handle; sub reaper { 1 until waitpid(-1, WNOHANG) == -1; $SIG{CHLD} = \&reaper; } $SIG{CHLD} = \&reaper; my $server = IO::Socket::INET->new( Listen => 5, ReuseAddr => 1, LocalPort => 8888, Proto => 'tcp' ); while (1) { my $client; my $client_addr = accept($client, $server); next unless $client_addr; # Fork to handle the connection. my $pid = fork(); # Parent should just accept again. next if $pid; # Handling errors is good. die "fork failed: $!" unless defined $pid; # Child here. close($server); $client->autoflush(1); print "$$: client connected\n"; while (<$client>) { chomp; print "$$: got input: $_\n"; print $client "$_\n"; last if /^\s*quit\s*/; } print "$$: client disconnected\n"; close $client; exit; }

    Perl Cookbook recipe 17.12 (Pre-Forking Servers) is a couple pages long, so I'm not going to adapt it here. I believe the source is legally available online, and consider this incentive to buy a fine book.

Re: Event based handling of multiple connections with AnyEvent
by Anonymous Monk on Oct 29, 2008 at 03:56 UTC

    Thanks for that dngor! The first one is definitely what I meant I'm trying to do. Sorry for being too vague.

    I was able to get something working using LeoNerd's IO::Async framework. If anyone cares to take a look, let me know if there are any obvious problems:

    #####################3 server.pl use strict; use warnings; use diagnostics; use Time::HiRes qw(time); use IO::Socket::INET; use IO::Async::Stream; use IO::Async::Loop; my $loop = IO::Async::Loop->new(); my $server_socket = IO::Socket::INET->new( Listen => 5, ReuseAddr => 1, LocalPort => 8888, Proto => 'tcp', Blocking => 1, # This line is very important ) or die $!; $server_socket->autoflush( 1 ) or die $!; $loop->listen( handle => $server_socket, on_accept => sub { my ( $newclient ) = @_; $loop->add( IO::Async::Stream->new( handle => $newclient, on_read => sub { my ( $self, $buffref, $closed ) = @_; # $self->write( $$buffref ); warn 'READ: ', $$buffref; my $send_start = time; $newclient->send( 'ECHO: ' . $$buffref ); warn 'TIME TO SEND TO CLIENT: ', time - $send_star +t; $$buffref = ''; return 0; }, ), ); }, on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; + }, on_listen_error => sub { print STDERR "Cannot listen\n"; }, ); $loop->loop_forever();
    ######################## client.pl use strict; use warnings; use diagnostics; use Time::HiRes(); use IO::Socket::INET; my $time = time; while() { my $client_socket = IO::Socket::INET->new( PeerAddr => '127.0.0.1', PeerPort => '8888', Proto => 'tcp', ) or die $!; $client_socket->autoflush( 1 ) or die $!; my $send_start = Time::HiRes::time; sleep int rand 6; warn sprintf( "PID: %s, TIME TO SEND TO SERVER: %s, ELAPSED: %s\n" +, $$, Time::HiRes::time - $send_start, time - $time, ); $client_socket->send( sprintf( "PID: %s, TIME TO SEND TO SERVER: % +s, ELAPSED: %s\n", $$, Time::HiRes::time - $send_start, time - $time, + ) ); my $recv_start = time; sleep int rand 11; $client_socket->recv( my $buffer, 1024 ); warn sprintf 'TIME TO READ FROM SERVER %s: ELAPSED: %s GOT: %s', T +ime::HiRes::time - $recv_start, time - $time, $buffer; }

    If any other framework authors would care to chime in *cough*mlehmann*cough* please feel free!

      You can do better in the server. Let $loop->listen() set up the socket rather than doing it yourself. Also, don't use $newclient->send() as that may block; $self->write() will put data into the Stream's outbound queue to be delivered as normal.
      #####################3 server.pl
      use strict;
      use warnings;
      use diagnostics;
      use Time::HiRes qw(time);
      use IO::Async::Stream;
      use IO::Async::Loop;
      use Socket qw( SOCK_STREAM );
      
      my $loop = IO::Async::Loop->new();
      
      $loop->listen(
          service   => 8888,
          socktype  => SOCK_STREAM,
          queuesize => 5,
          reuseaddr => 1,
      
          on_accept => sub {
              my ( $newclient ) = @_;
      
              $loop->add(
                  IO::Async::Stream->new(
                      handle => $newclient,
      
                      on_read => sub {
                          my ( $self, $buffref, $closed ) = @_;
                          warn 'READ: ', $$buffref;
                          my $send_start = time;
                          $self->write( 'ECHO: ' . $$buffref );
                          warn 'TIME TO SEND TO CLIENT: ', time - $send_start;
                          $$buffref = '';
                          return 0;
                      },
                  ),
              );
          },
      
          on_resolve_error => sub { print STDERR "Cannot resolve - $_[0]\n"; },
          on_listen_error  => sub { print STDERR "Cannot listen\n"; },
      );
      
      $loop->loop_forever();
Re: Event based handling of multiple connections with AnyEvent
by bennymack (Pilgrim) on Nov 04, 2008 at 12:10 UTC

    Update: working AnyEvent version.

    #!/usr/bin/env perl use strict; use warnings; use Data::Dumper; use POSIX; use EV(); use AnyEvent(); use AnyEvent::Handle(); use IO::Socket::INET(); use Socket qw(SOCK_STREAM); my $server_socket = IO::Socket::INET->new( Listen => 5, ReuseAddr => 1, LocalPort => 8888, Blocking => 0, Type => SOCK_STREAM, ); $server_socket->autoflush( 1 ); my $child = AnyEvent->condvar; my $server_socket_watcher = AnyEvent->io( fh => $server_socket, poll => 'r', cb => sub { my $client_socket = $server_socket->accept; my $client = AnyEvent->condvar; my $handle = AnyEvent::Handle->new( fh => $client_socket, on_read => sub { my( $self ) = @_; warn $$, ' GOT: ', $self->rbuf; if( $self->rbuf =~ /QUIT/ ) { warn $$, ' QUITTING'; $client->send; } }, on_drain => sub { warn $$, ' drain '; }, on_error => sub { print "Client connection error\n"; $clie +nt->broadcast; }, ); $client->recv; warn $$, ' CLIENT: recv'; }, ); warn $$, ' $child->recv'; $child->recv; warn $$, ' CHILD: done'; exit;

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://720090]
Approved by Corion
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others chilling in the Monastery: (10)
As of 2014-09-18 12:09 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    How do you remember the number of days in each month?











    Results (113 votes), past polls