use v5.22; use strictures 2; use JSON::XS qw(decode_json); use Data::Dumper 'Dumper'; use Net::Kafka::Consumer; my $consumer = Net::Kafka::Consumer->new( 'bootstrap.servers' => 'server1.com:9094,server2.com:9094,server3.com:9094', 'group.id' => 'my_consumer_group', 'security.protocol' => 'SSL', 'ssl.keystore.location' => '/path/to/kafka.client.keystore.p12', 'ssl.keystore.password' => 'password', 'ssl.key.password' => 'password', error_cb => sub { my ($self, $err, $msg) = @_; warn Dumper({ ERR => $err, MSG => $msg }); }, ); # see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for all options $consumer->subscribe( [ "my_topic" ] ); warn Dumper({ consumer => $consumer }); while (1) { my $msg = $consumer->poll(1000); if ($msg) { if ( my $err = $msg->err ) { say "Error: ", Net::Kafka::Error::to_string($err); } else { say Dumper({ PAYLOAD => decode_json $msg->payload }); } } }