Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl-Sensitive Sunglasses
 
PerlMonks  

Switching POE::Filters mid-stream

by zengargoyle (Deacon)
on Oct 13, 2003 at 02:20 UTC ( #298747=perlquestion: print w/ replies, xml ) Need Help??
zengargoyle has asked for the wisdom of the Perl Monks concerning the following question:

this is my first POE attempt so please be lenient.

i'm trying to create a minimal XML/Jabber server for possible use in the PerlXPTrain project. just enough to accept connections and route messages between users of the one-and-only server. something to test with without having to install a full Jabber/XMPP server.

good news is that it works, i can connect with the examples/client.pl from Net::Jabber.

bad news it that it's ugly because i can't for the life of me figure out how to change my InputFilter from Filter::Stream to a Filter::Stackable( Filters => [ Filter::Stream, Filter::XML ] )

i started with TCP_Forwarding from the POE_Cookbook for the general structure.

router_client_input
handles the initial handshake then yields to auth_setup
router_auth_setup
the OutputFilter is no problem, it just works. the InputFilter needs to be Filter::Stream'd and Filter::XML'd.
router_auth_input
i would like to be recieving a PXR::Node from my InputFilters, but instead i can only manage a Filter::Stream and i use an ugly hack of creating a private instance of Filter::XML and sending my input through by hand to get a PXR::Node to work with.

i've tried...

  • starting with Filter::Stackable with a Filter::Stream and then pushing on a Filter::XML
  • starting with Filter::Stream, getting any pending input, creating a new Stackable with Stream, XML (sending the pending input to the Stream) and setting $wheel->set_input_filter($filter_stack)
  • and a few more variations of the above.

so, can anyone show me a way to handle the Filter switch?

#!/usr/bin/perl use strict; use warnings; $|++; use Socket; use POSIX qw( errno_h ); use PXR::NS qw/ :IQ :JABBER /; use POE qw( Wheel::ReadWrite Wheel::SocketFactory Filter::Stackable Filter::Stream Filter::XML ); my %Config = ( IP => 'localhost', PORT => 9999, HOSTNAME => 'localhost', ); my %routes = (); # <id> => <wheel_client> my %pending = (); # <id> => [ <message>, ... ] # Old Text Based Core # # input in this form -> '<from_id> <to_id> <message>' # # my ($from, $to, $msg) = split /\s+/, $input, 3; # # unless ( exists $routes{$from} ) { # $routes{$from} = $heap->{wheel_client}; # } # # if ( exists $pending{$from} ) { # $routes{$from}->put( $_ ) for @{ $pending{$from} }; # } # # unless ( exists $routes{$to} ) { # push @{ $pending{$to} }, $input; # } else { # $routes{$to}->put( $input ); # } sub router_create { my ( $handle, $peer_host, $peer_port, $remote_addr, $remote_port ) + = @_; POE::Session->new( _start => \&router_start, _stop => \&router_stop, client_input => \&router_client_input, client_error => \&router_client_error, output_handler => \&router_output_handler, auth_setup => \&router_auth_setup, auth_input => \&router_auth_input, user_input => \&router_user_input, [ $handle, $peer_host, $peer_port, $remote_addr, $remote_port +] ); } sub router_start { my ( $heap, $session, $socket, $peer_host, $peer_port, $remote_add +r, $remote_port ) = @_[ HEAP, SESSION, ARG0, ARG1, ARG2, ARG3, ARG4 ]; $heap->{log} = $session->ID; $peer_host = inet_ntoa($peer_host); $heap->{peer_host} = $peer_host; $heap->{peer_port} = $peer_port; $heap->{remote_addr} = $remote_addr; $heap->{remote_port} = $remote_port; print "[$heap->{log}] Accepted session from $peer_host:$peer_port$ +/"; $heap->{socket} = $socket; $heap->{state} = 'connecting'; $heap->{wheel_client} = POE::Wheel::ReadWrite->new( Handle => $socket, Driver => POE::Driver::SysRW->new, InputFilter => POE::Filter::Stream->new, OutputFilter => POE::Filter::Stream->new, InputEvent => 'client_input', ErrorEvent => 'client_error', ); } # match the client's <stream:stream> # send our <stream:sream> sub router_client_input { my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEA +P, ARG0 ]; print "[$heap->{log}] said: $input$/"; # XXX come up with better test. if ( $input =~ /<stream:stream\s/ ) { $kernel->call( $session, output_handler => <<_X_ ); <?xml version="1.0"?> <stream:stream xmlns:stream="http://etherx.jabber.org/streams" xmlns=" +jabber:clie nt" from="localhost"> _X_ $kernel->yield('auth_setup'); } } # try to change the Filters on our Wheel sub router_auth_setup { my ( $kernel, $session, $heap ) = @_[ KERNEL, SESSION, HEAP ]; print "Changing Filter$/"; my $if = $heap->{wheel_client}->get_input_filter; print "pending: ", $if->get_pending, $/; $heap->{wheel_client}->set_output_filter( POE::Filter::XML->new ); # $heap->{wheel_client}->set_input_filter( # POE::Filter::Stackable->new( Filters => [ # POE::Filter::Stream->new( # $heap->{wheel_client}->get_input_filter->get_p +ending # ), # POE::Filter::XML->new( # ), # ]) # ); print "Changed Filter$/"; $heap->{wheel_client}->event( InputEvent => 'auth_input' ); } sub DD { require Data::Dumper; print Data::Dumper->Dump(@_); } # we should be passing PXR::Node as input but Filter swap # has me miffed. sub router_auth_input { my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEA +P, ARG0 ]; my $node; if (0) { $node = $input; } else { # one butt ugly hack my $flt = POE::Filter::XML->new($input); print "[$heap->{log}] noded: ", $input, $/; my $nodes = $flt->get_one( [] ); $node = $nodes->[0]; } #DD( [$node], ['node'] ); print "[$heap->{log}] noded: ", $node->to_str, $/; # create our reply node my $reply = PXR::Node->new('iq'); $reply->attr( type => +IQ_RESULT ); $reply->attr( id => $node->attr('id') ); if ( $node->name eq 'iq' ) { my $q = $reply->insert_tag( query => +NS_JABBER_AUTH ); if ( $node->attr('type') eq +IQ_GET ) { # ask for username/password $q->insert_tag('username'); $q->insert_tag('password'); } elsif ( $node->attr('type') eq +IQ_SET ) { # XXX do real auth check on username/password here. # we are authed! $heap->{wheel_client}->event( InputEvent => 'user_input' ) +; } } else { # XXX check actual format for errors. $reply->attr( type => 'error' ); } print "[$heap->{log}] reply: ", $reply->to_str, $/; $kernel->post( $session, output_handler => $reply ); } # this should be our final handler. # we'll dispatch each node to it's final destination sub router_user_input { my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEA +P, ARG0 ]; my $node; if (0) { $node = $input; } else { $input =~ s/^\s+//; $node = ( POE::Filter::XML->new($input)->get_one( [] ) )->[0] if l +ength $inpu t; } if ( defined $node ) { print "[$heap->{log}] user: ", $node->to_str, $/; # XXX route node to destination. } else { # Net::Jabber::Client keep-alive sends spaces. # These make empty nodes, we'll send a message to # client for a test. my $reply = PXR::Node->new('message'); $reply->attr( type => 'normal' ); $reply->attr( to => 'user2@localhost' ); $reply->attr( from => 'user1@localhost' ); $reply->insert_tag( 'body' )->data( q{woot!} ); $reply->insert_tag( 'subject' )->data( q{this is a test} ); $kernel->post( $session, output_handler => $reply ); } } sub router_output_handler { my ( $heap, $data ) = @_[ HEAP, ARG0 ]; $heap->{wheel_client}->put($data); } sub router_stop { my $heap = $_[HEAP]; print "[$heap->{log}] Closing session\n"; } sub router_client_error { my ( $kernel, $heap, $operation, $errnum, $errstr ) = @_[ KERNEL, HEAP, ARG0, ARG1, ARG2 ]; if ($errnum) { print "[$heap->{log}] Client connection encountered $operation error $errnum +: $errstr$/ "; } else { print "[$heap->{log}] Client closed connection$/"; } delete $heap->{wheel_client}; } sub server_create { POE::Session->new( _start => \&server_start, _stop => \&server_stop, accept_success => \&server_accept_success, accept_failure => \&server_accept_failure, ); } sub server_start { my ($heap) = @_[HEAP]; print "Starting server$/"; $heap->{server_wheel} = POE::Wheel::SocketFactory->new( BindAddress => $Config{IP}, BindPort => $Config{PORT}, Reuse => 'yes', SuccessEvent => 'accept_success', FailureEvent => 'accept_failure', ); } sub server_stop { my ($heap) = @_[HEAP]; print "Stopping server$/"; } sub server_accept_success { my ( $heap, $socket, $peer_addr, $peer_port ) = @_[ HEAP, ARG0 .. +ARG2 ]; router_create( $socket, $peer_addr, $peer_port, $heap->{remote_add +r}, $heap->{remote_port} ); } sub server_accept_failure { my ( $heap, $operation, $errnum, $errstr ) = @_[ HEAP, ARG0, ARG1, + ARG2 ]; print "Failure: $heap->{remote_addr} $heap->{remote_port} encountered $opera +tion error $errnum: $errstr$/"; delete $heap->{server_wheel} if $errnum == ENFILE or $errnum == EM +FILE; } server_create(); POE::Kernel->run(); __END__ $ perl testserver.pl Starting server [3] Accepted session from 127.0.0.1:41208 [3] said: <?xml version="1.0"?><stream:stream xmlns:stream="http://eth +erx.jabber.org/streams" xmlns="jabber:client" to="localhost" from="la +pcat" > Changing Filter Changed Filter [3] noded: <iq id='netjabber-0' type='get'><query xmlns='jabber:iq:aut +h'><username>username</username></query></iq> [3] noded: <iq id='netjabber-0' type='get'><query xmlns='jabber:iq:aut +h'><username>username</username></query></iq> [3] reply: <iq id='netjabber-0' type='result'><query xmlns='jabber:iq: +auth'><username/><password/></query></iq> [3] noded: <iq id='netjabber-1' type='set'><query xmlns='jabber:iq:aut +h'><password>password</password><resource>resource</resource><usernam +e>username</username></query></iq> [3] noded: <iq id='netjabber-1' type='set'><query xmlns='jabber:iq:aut +h'><password>password</password><resource>resource</resource><usernam +e>username</username></query></iq> [3] reply: <iq id='netjabber-1' type='result'><query xmlns='jabber:iq: +auth'/></iq> [3] user: <iq id='netjabber-2' type='get'><query xmlns='jabber:iq:rost +er'/></iq> [3] Closing session Stopping server $ perl client.pl localhost 9999 username password resource Logged in to localhost:9999... === Message (normal) From: user1 () Subject: this is a test Body: woot! === <message from='user1@localhost' to='user2@localhost' type='normal'><bo +dy>woot!</body><subject>this is a test</subject></message> ===

Comment on Switching POE::Filters mid-stream
Download Code
Re: Switching POE::Filters mid-stream
by revdiablo (Prior) on Oct 13, 2003 at 23:31 UTC
Re: Switching POE::Filters mid-stream
by Flame (Deacon) on Nov 17, 2003 at 03:31 UTC

    I'm relatively clueless when it comes to POE, though I'm trying to figure it out myself right now, but I have one question... why do you need to stack the Stream and XML filters. I have used the stacked filter mechanism before... in fact I'm using it now within a Component::Server::TCP.

    Per the docs for Filter::Stream, it does nothing... absolutely nothing... so why not simply switch to Filter::XML?

    If I'm wrong, I'm wrong, but please explain why...





    My code doesn't have bugs, it just develops random features.

    Flame

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://298747]
Approved by Thelonius
Front-paged by broquaint
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others romping around the Monastery: (9)
As of 2014-09-18 21:56 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    How do you remember the number of days in each month?











    Results (125 votes), past polls