Beefy Boxes and Bandwidth Generously Provided by pair Networks
Welcome to the Monastery
 
PerlMonks  

Switching POE::Filters mid-stream

by zengargoyle (Deacon)
on Oct 13, 2003 at 02:20 UTC ( #298747=perlquestion: print w/ replies, xml ) Need Help??
zengargoyle has asked for the wisdom of the Perl Monks concerning the following question:

this is my first POE attempt so please be lenient.

i'm trying to create a minimal XML/Jabber server for possible use in the PerlXPTrain project. just enough to accept connections and route messages between users of the one-and-only server. something to test with without having to install a full Jabber/XMPP server.

good news is that it works, i can connect with the examples/client.pl from Net::Jabber.

bad news it that it's ugly because i can't for the life of me figure out how to change my InputFilter from Filter::Stream to a Filter::Stackable( Filters => [ Filter::Stream, Filter::XML ] )

i started with TCP_Forwarding from the POE_Cookbook for the general structure.

router_client_input
handles the initial handshake then yields to auth_setup
router_auth_setup
the OutputFilter is no problem, it just works. the InputFilter needs to be Filter::Stream'd and Filter::XML'd.
router_auth_input
i would like to be recieving a PXR::Node from my InputFilters, but instead i can only manage a Filter::Stream and i use an ugly hack of creating a private instance of Filter::XML and sending my input through by hand to get a PXR::Node to work with.

i've tried...

  • starting with Filter::Stackable with a Filter::Stream and then pushing on a Filter::XML
  • starting with Filter::Stream, getting any pending input, creating a new Stackable with Stream, XML (sending the pending input to the Stream) and setting $wheel->set_input_filter($filter_stack)
  • and a few more variations of the above.

so, can anyone show me a way to handle the Filter switch?

#!/usr/bin/perl use strict; use warnings; $|++; use Socket; use POSIX qw( errno_h ); use PXR::NS qw/ :IQ :JABBER /; use POE qw( Wheel::ReadWrite Wheel::SocketFactory Filter::Stackable Filter::Stream Filter::XML ); my %Config = ( IP => 'localhost', PORT => 9999, HOSTNAME => 'localhost', ); my %routes = (); # <id> => <wheel_client> my %pending = (); # <id> => [ <message>, ... ] # Old Text Based Core # # input in this form -> '<from_id> <to_id> <message>' # # my ($from, $to, $msg) = split /\s+/, $input, 3; # # unless ( exists $routes{$from} ) { # $routes{$from} = $heap->{wheel_client}; # } # # if ( exists $pending{$from} ) { # $routes{$from}->put( $_ ) for @{ $pending{$from} }; # } # # unless ( exists $routes{$to} ) { # push @{ $pending{$to} }, $input; # } else { # $routes{$to}->put( $input ); # } sub router_create { my ( $handle, $peer_host, $peer_port, $remote_addr, $remote_port ) + = @_; POE::Session->new( _start => \&router_start, _stop => \&router_stop, client_input => \&router_client_input, client_error => \&router_client_error, output_handler => \&router_output_handler, auth_setup => \&router_auth_setup, auth_input => \&router_auth_input, user_input => \&router_user_input, [ $handle, $peer_host, $peer_port, $remote_addr, $remote_port +] ); } sub router_start { my ( $heap, $session, $socket, $peer_host, $peer_port, $remote_add +r, $remote_port ) = @_[ HEAP, SESSION, ARG0, ARG1, ARG2, ARG3, ARG4 ]; $heap->{log} = $session->ID; $peer_host = inet_ntoa($peer_host); $heap->{peer_host} = $peer_host; $heap->{peer_port} = $peer_port; $heap->{remote_addr} = $remote_addr; $heap->{remote_port} = $remote_port; print "[$heap->{log}] Accepted session from $peer_host:$peer_port$ +/"; $heap->{socket} = $socket; $heap->{state} = 'connecting'; $heap->{wheel_client} = POE::Wheel::ReadWrite->new( Handle => $socket, Driver => POE::Driver::SysRW->new, InputFilter => POE::Filter::Stream->new, OutputFilter => POE::Filter::Stream->new, InputEvent => 'client_input', ErrorEvent => 'client_error', ); } # match the client's <stream:stream> # send our <stream:sream> sub router_client_input { my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEA +P, ARG0 ]; print "[$heap->{log}] said: $input$/"; # XXX come up with better test. if ( $input =~ /<stream:stream\s/ ) { $kernel->call( $session, output_handler => <<_X_ ); <?xml version="1.0"?> <stream:stream xmlns:stream="http://etherx.jabber.org/streams" xmlns=" +jabber:clie nt" from="localhost"> _X_ $kernel->yield('auth_setup'); } } # try to change the Filters on our Wheel sub router_auth_setup { my ( $kernel, $session, $heap ) = @_[ KERNEL, SESSION, HEAP ]; print "Changing Filter$/"; my $if = $heap->{wheel_client}->get_input_filter; print "pending: ", $if->get_pending, $/; $heap->{wheel_client}->set_output_filter( POE::Filter::XML->new ); # $heap->{wheel_client}->set_input_filter( # POE::Filter::Stackable->new( Filters => [ # POE::Filter::Stream->new( # $heap->{wheel_client}->get_input_filter->get_p +ending # ), # POE::Filter::XML->new( # ), # ]) # ); print "Changed Filter$/"; $heap->{wheel_client}->event( InputEvent => 'auth_input' ); } sub DD { require Data::Dumper; print Data::Dumper->Dump(@_); } # we should be passing PXR::Node as input but Filter swap # has me miffed. sub router_auth_input { my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEA +P, ARG0 ]; my $node; if (0) { $node = $input; } else { # one butt ugly hack my $flt = POE::Filter::XML->new($input); print "[$heap->{log}] noded: ", $input, $/; my $nodes = $flt->get_one( [] ); $node = $nodes->[0]; } #DD( [$node], ['node'] ); print "[$heap->{log}] noded: ", $node->to_str, $/; # create our reply node my $reply = PXR::Node->new('iq'); $reply->attr( type => +IQ_RESULT ); $reply->attr( id => $node->attr('id') ); if ( $node->name eq 'iq' ) { my $q = $reply->insert_tag( query => +NS_JABBER_AUTH ); if ( $node->attr('type') eq +IQ_GET ) { # ask for username/password $q->insert_tag('username'); $q->insert_tag('password'); } elsif ( $node->attr('type') eq +IQ_SET ) { # XXX do real auth check on username/password here. # we are authed! $heap->{wheel_client}->event( InputEvent => 'user_input' ) +; } } else { # XXX check actual format for errors. $reply->attr( type => 'error' ); } print "[$heap->{log}] reply: ", $reply->to_str, $/; $kernel->post( $session, output_handler => $reply ); } # this should be our final handler. # we'll dispatch each node to it's final destination sub router_user_input { my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEA +P, ARG0 ]; my $node; if (0) { $node = $input; } else { $input =~ s/^\s+//; $node = ( POE::Filter::XML->new($input)->get_one( [] ) )->[0] if l +ength $inpu t; } if ( defined $node ) { print "[$heap->{log}] user: ", $node->to_str, $/; # XXX route node to destination. } else { # Net::Jabber::Client keep-alive sends spaces. # These make empty nodes, we'll send a message to # client for a test. my $reply = PXR::Node->new('message'); $reply->attr( type => 'normal' ); $reply->attr( to => 'user2@localhost' ); $reply->attr( from => 'user1@localhost' ); $reply->insert_tag( 'body' )->data( q{woot!} ); $reply->insert_tag( 'subject' )->data( q{this is a test} ); $kernel->post( $session, output_handler => $reply ); } } sub router_output_handler { my ( $heap, $data ) = @_[ HEAP, ARG0 ]; $heap->{wheel_client}->put($data); } sub router_stop { my $heap = $_[HEAP]; print "[$heap->{log}] Closing session\n"; } sub router_client_error { my ( $kernel, $heap, $operation, $errnum, $errstr ) = @_[ KERNEL, HEAP, ARG0, ARG1, ARG2 ]; if ($errnum) { print "[$heap->{log}] Client connection encountered $operation error $errnum +: $errstr$/ "; } else { print "[$heap->{log}] Client closed connection$/"; } delete $heap->{wheel_client}; } sub server_create { POE::Session->new( _start => \&server_start, _stop => \&server_stop, accept_success => \&server_accept_success, accept_failure => \&server_accept_failure, ); } sub server_start { my ($heap) = @_[HEAP]; print "Starting server$/"; $heap->{server_wheel} = POE::Wheel::SocketFactory->new( BindAddress => $Config{IP}, BindPort => $Config{PORT}, Reuse => 'yes', SuccessEvent => 'accept_success', FailureEvent => 'accept_failure', ); } sub server_stop { my ($heap) = @_[HEAP]; print "Stopping server$/"; } sub server_accept_success { my ( $heap, $socket, $peer_addr, $peer_port ) = @_[ HEAP, ARG0 .. +ARG2 ]; router_create( $socket, $peer_addr, $peer_port, $heap->{remote_add +r}, $heap->{remote_port} ); } sub server_accept_failure { my ( $heap, $operation, $errnum, $errstr ) = @_[ HEAP, ARG0, ARG1, + ARG2 ]; print "Failure: $heap->{remote_addr} $heap->{remote_port} encountered $opera +tion error $errnum: $errstr$/"; delete $heap->{server_wheel} if $errnum == ENFILE or $errnum == EM +FILE; } server_create(); POE::Kernel->run(); __END__ $ perl testserver.pl Starting server [3] Accepted session from 127.0.0.1:41208 [3] said: <?xml version="1.0"?><stream:stream xmlns:stream="http://eth +erx.jabber.org/streams" xmlns="jabber:client" to="localhost" from="la +pcat" > Changing Filter Changed Filter [3] noded: <iq id='netjabber-0' type='get'><query xmlns='jabber:iq:aut +h'><username>username</username></query></iq> [3] noded: <iq id='netjabber-0' type='get'><query xmlns='jabber:iq:aut +h'><username>username</username></query></iq> [3] reply: <iq id='netjabber-0' type='result'><query xmlns='jabber:iq: +auth'><username/><password/></query></iq> [3] noded: <iq id='netjabber-1' type='set'><query xmlns='jabber:iq:aut +h'><password>password</password><resource>resource</resource><usernam +e>username</username></query></iq> [3] noded: <iq id='netjabber-1' type='set'><query xmlns='jabber:iq:aut +h'><password>password</password><resource>resource</resource><usernam +e>username</username></query></iq> [3] reply: <iq id='netjabber-1' type='result'><query xmlns='jabber:iq: +auth'/></iq> [3] user: <iq id='netjabber-2' type='get'><query xmlns='jabber:iq:rost +er'/></iq> [3] Closing session Stopping server $ perl client.pl localhost 9999 username password resource Logged in to localhost:9999... === Message (normal) From: user1 () Subject: this is a test Body: woot! === <message from='user1@localhost' to='user2@localhost' type='normal'><bo +dy>woot!</body><subject>this is a test</subject></message> ===

Comment on Switching POE::Filters mid-stream
Download Code
Re: Switching POE::Filters mid-stream
by revdiablo (Prior) on Oct 13, 2003 at 23:31 UTC
Re: Switching POE::Filters mid-stream
by Flame (Deacon) on Nov 17, 2003 at 03:31 UTC

    I'm relatively clueless when it comes to POE, though I'm trying to figure it out myself right now, but I have one question... why do you need to stack the Stream and XML filters. I have used the stacked filter mechanism before... in fact I'm using it now within a Component::Server::TCP.

    Per the docs for Filter::Stream, it does nothing... absolutely nothing... so why not simply switch to Filter::XML?

    If I'm wrong, I'm wrong, but please explain why...





    My code doesn't have bugs, it just develops random features.

    Flame

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://298747]
Approved by Thelonius
Front-paged by broquaint
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others taking refuge in the Monastery: (7)
As of 2015-07-04 20:22 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    The top three priorities of my open tasks are (in descending order of likelihood to be worked on) ...









    Results (60 votes), past polls