perlquestion
cLive ;-)
<p>I have a requirement to connect to a Kafka server over SSL. I have the certs, private key, and password, but I can't work out how to even test a connection. Looking at the Kafka modules, they use <code>Socket</code> to create the connection, and the Kafka modules don't support SSL.</p>
<p>I've started playing with <code>IO::Socket::SSL</code> to try to connect to the server, but I think I'll have to work out how to do this at the lower level Socket interface, even if I manage to connect using <code>IO::Socket::SSL</code>, because of how Kafka::IO is written. I know very little about socket coding, nor why the decision was made to use <code>Socket</code> over <code>IO::Socket</code> so this feels like a very steep learning curve. So far, I've managed to create the socket, and I can send data to the server, but I'm guessing the server thinks it's just gibberish, because it closes the socket straight away. Or, possibly, I'm not doing whatever it is I need to do with the password to get a valid response.</p>
<p>Does anyone have any pointers that could get me going in the right direction. I did drag out my old copy of Stein's Network programming with Perl, but I don't see any references to SSL in it.</p>
<p>I get no errors when running this at debug level 3:</p>
<code>my $socket = IO::Socket::SSL->new(
PeerAddr => $host,
PeerPort => 9094,
SSL_verify_mode => SSL_VERIFY_NONE,
SSL_key_file => './key.pem',
SSL_cert_file => './cert.pem',
) or die "failed to connect: $SSL_ERROR"</code>
<p>but that doesn't include using the password (I'm not sure whether that's for the SSL level or Kafka level ATM). Is there something I can print to the socket to get metadata from the server?</p>
<p>I tried looking for info on the raw socket commends that Kafka uses, but I'm only finding API docs right now :/</p>
<p>All pointers welcomed...</p>
<h3>update</h3>
Got it working with <b>Net::Kafka</b> after following the pointers below. For future reference for those that need it, here's the code I used:
<code>
use v5.22;
use strictures 2;
use JSON::XS qw(decode_json);
use Data::Dumper 'Dumper';
use Net::Kafka::Consumer;
my $consumer = Net::Kafka::Consumer->new(
'bootstrap.servers' => 'server1.com:9094,server2.com:9094,server3.com:9094',
'group.id' => 'my_consumer_group',
'security.protocol' => 'SSL',
'ssl.keystore.location' => '/path/to/kafka.client.keystore.p12',
'ssl.keystore.password' => 'password',
'ssl.key.password' => 'password',
error_cb => sub {
my ($self, $err, $msg) = @_;
warn Dumper({ ERR => $err, MSG => $msg });
},
);
# see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md for all options
$consumer->subscribe( [ "my_topic" ] );
warn Dumper({ consumer => $consumer });
while (1) {
my $msg = $consumer->poll(1000);
if ($msg) {
if ( my $err = $msg->err ) {
say "Error: ", Net::Kafka::Error::to_string($err);
}
else {
say Dumper({ PAYLOAD => decode_json $msg->payload });
}
}
}
</code>
The module is sparsely documented, so I ended up doing a little hit and miss testing to get here.