use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_decode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received: ", pp($json); say "Payload: ", pp( b64_decode($json->{payload}) ); say "Sending ack"; $tx->send({ json => { messageId => $json->{messageId} } }); }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running; #### use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_encode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received ack: ", pp($json); $tx->finish; # close socket after sending test message }); $tx->send({ json => { payload => b64_encode("Hello World", ''), properties => { key1 => "value1", key2 => "value2", }, context => "1", } }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;