Beefy Boxes and Bandwidth Generously Provided by pair Networks
There's more than one way to do things
 
PerlMonks  

Re: Apache Pulsar modules

by haukex (Bishop)
on Nov 14, 2020 at 12:37 UTC ( #11123645=note: print w/replies, xml ) Need Help??


in reply to Apache Pulsar modules

Does anyone have or know of Perl modules to publish data to an Apache Pulsar messaging bus?

I don't see any, sorry. However, according to the Plusar docs:

Pulsar's WebSocket API is meant to provide a simple way to interact with Pulsar using languages that do not have an official client library. Through WebSockets you can publish and consume messages and use all the features available in the Java, Go, Python and C++ client libraries.

Here's a simple producer and consumer implemented using Mojolicious, based on the Node.js examples in the above link:

consumer.pl

use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_decode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/consumer/persistent/public/defau +lt/my-topic/my-sub'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received: ", pp($json); say "Payload: ", pp( b64_decode($json->{payload}) ); say "Sending ack"; $tx->send({ json => { messageId => $json->{messageId} } }); }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

producer.pl

use Mojo::Base -strict, -signatures; use Mojo::UserAgent; use Mojo::IOLoop; use Mojo::Util qw/b64_encode/; use Data::Dump qw/dd pp/; my $ADDR = 'ws://localhost:8080/ws/v2/producer/persistent/public/defau +lt/my-topic'; say "Opening WebSocket..."; my $ua = Mojo::UserAgent->new; $ua->websocket($ADDR => sub ($ua, $tx) { die 'WebSocket handshake failed!'.pp($tx) unless $tx->is_websocket; $tx->on(finish => sub ($tx, $code, $reason=undef) { say "WebSocket closed ", pp($code, $reason); }); $tx->on(json => sub ($tx, $json) { say "Received ack: ", pp($json); $tx->finish; # close socket after sending test message }); $tx->send({ json => { payload => b64_encode("Hello World", ''), properties => { key1 => "value1", key2 => "value2", }, context => "1", } }); }); Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11123645]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others scrutinizing the Monastery: (4)
As of 2021-07-26 15:39 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found

    Notices?