Beefy Boxes and Bandwidth Generously Provided by pair Networks
Pathologically Eclectic Rubbish Lister
 
PerlMonks  

Re: Architecture advice, proxy or rebroadcast websocket

by haukex (Chancellor)
on Jan 17, 2020 at 16:31 UTC ( #11111532=note: print w/replies, xml ) Need Help??


in reply to Architecture advice, proxy or rebroadcast websocket

Personally, I'd implement your second suggestion (a multiplexer in Mojo), with the thirdfourth suggestion (DB) in second place, and the other two sound too complicated for my tastes ;-) Here's how I might have implemented the second option, with the sample server and client based on your code in 11110853:

serv.pl:

#!/usr/bin/env perl use Mojolicious::Lite -signatures; use Data::Dump qw/dd pp/; # morbo --listen='http://localhost:3000' serv.pl # This code assumes a single thread / no forking! my $single_client; websocket '/' => sub ($c) { if ( defined $single_client ) { $c->app->log->debug('WebSocket opened, rejecting'); $c->send("Too many client connections"); $c->tx->finish; return; } $c->app->log->debug('WebSocket opened'); $c->inactivity_timeout(45); $single_client = $c->tx; my $timer_id; $c->on(message => sub ($c, $msg) { if ( uc $msg eq "RUN" ) { if ( !defined $timer_id ) { $c->send("Starting to run"); $timer_id = Mojo::IOLoop->recurring( 1 => sub { # pretend to be a fragile server if ( rand > 0.97 ) { $c->app->log->warn('Oops, disconnecting!'); $single_client->finish; } elsif ( rand > 0.95 ) { $c->app->log->warn('Oops, freezing!'); Mojo::IOLoop->remove($timer_id); } else { $c->send("Running ".(time-$^T)) } } ); } else { $c->send("Already running") } } elsif ( uc $msg eq "STOP" ) { if ( defined $timer_id ) { $c->send("Stopping..."); Mojo::IOLoop->remove($timer_id); $timer_id = undef; } else { $c->send("Not running") } } elsif ( uc $msg eq "HEARTBEAT" ) { } else { $c->send("Unknown command: $msg") } }); $c->on(finish => sub ($c, $code, $reason=undef) { $c->app->log->debug("WebSocket closed ".pp($code, $reason)); Mojo::IOLoop->remove($timer_id); $single_client = undef; }); $c->send("Connected... RUN|STOP"); }; app->start

cli.pl:

#!/usr/bin/env perl use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Data::Dump qw/dd pp/; # To connect to mux.pl: perl cli.pl ws://localhost:3001 my $ADDR = @ARGV ? $ARGV[0] : 'ws://localhost:3000'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->inactivity_timeout(10); # must be more than mux's timeout $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; my $timer_id = Mojo::IOLoop->recurring( 30 => sub { say "Sending heartbeat"; $tx->send('HEARTBEAT'); } ); $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ".pp($code, $reason); Mojo::IOLoop->remove($timer_id); }); $tx->on(message => sub ($tx, $msg) { say "RX: ".pp($msg); }); $tx->send('RUN'); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

mux.pl:

#!/usr/bin/env perl use Mojolicious::Lite -signatures; use Mojo::UserAgent; use Mojo::EventEmitter; use Data::Dump qw/dd pp/; # morbo --listen='http://localhost:3001' mux.pl # This code assumes a single thread / no forking! my $event = Mojo::EventEmitter->new; sub open_server_conn { state $ua = Mojo::UserAgent->new; $ua->inactivity_timeout(5); # must be less than client's timeout app->log->debug("Opening WebSocket to server..."); $ua->websocket('ws://localhost:3000' => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; my $timer_id = Mojo::IOLoop->recurring( 30 => sub { app->log->debug("Sending heartbeat"); $tx->send('HEARTBEAT'); } ); $tx->on(finish => sub ($tx, $code, $reason=undef) { app->log->debug("Server WebSocket closed " .pp($code, $reason)); Mojo::IOLoop->remove($timer_id); open_server_conn(); # reopen }); $tx->on(message => sub ($tx, $msg) { # message from server return if $msg=~/^Connected\b|^Starting\b/; return unless $event->has_subscribers('message'); app->log->debug("Relaying ".pp($msg)); $event->emit( message => $msg ); }); $tx->send('RUN'); }); } Mojo::IOLoop->next_tick(\&open_server_conn); websocket '/' => sub ($c) { # client connections $c->app->log->debug('Client connected'); $c->inactivity_timeout(45); my $tx = $c->tx; # client transaction my $cb = $event->on(message => sub ($e,$msg) { $tx->send($msg) }); $c->on(message => sub ($c, $msg) { # message from client if ( uc $msg eq "RUN" ) { } # ignore client messages (for now) elsif ( uc $msg eq "STOP" ) { } elsif ( uc $msg eq "HEARTBEAT" ) { } else { $tx->send("Unknown command: $msg") } }); $c->on(finish => sub ($c, $code, $reason=undef) { $c->app->log->debug("Client disconn ".pp($code, $reason)); $event->unsubscribe(message => $cb); ($tx,$cb) = (); }); $c->send("Connected... RUN|STOP"); }; app->start

Note how in the multiplexer, I'm using a Mojo::EventEmitter to decouple the server messages from the client connections. Of course the code has a few assumptions that you are free to adapt, like whether you want to ignore or forward messages coming from each client to the server (in the latter case, you could also relay them via the event emitter using a different event name, for example), or and more generally how transparent you want to multiplexer to be (or not).

Replies are listed 'Best First'.
Re^2: Architecture advice, proxy or rebroadcast websocket
by Your Mother (Bishop) on Jan 17, 2020 at 17:52 UTC

    You are a hero.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://11111532]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others pondering the Monastery: (4)
As of 2020-02-17 06:26 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    What numbers are you going to focus on primarily in 2020?










    Results (70 votes). Check out past polls.

    Notices?