#!/usr/bin/perl use threads; use threads::shared; use JSON; use WebSphere::MQTT::Client; use Device::SerialPort; use Time::HiRes qw(time); use strict; # Jeelink port and speed my $jeelink_port = '/dev/ttyUSB0'; # mqtt configuration my $mqtt_hostname = 'localhost'; my $mqtt_port = 1883; # this array is the queu commands to be written to the JeeLink my @queue : shared = (); # the thread to subscribes to MQTT messages sub mqtt { my $mqtta = new WebSphere::MQTT::Client( Hostname => $mqtt_hostname, Port => $mqtt_port, clientid => 'rf12_received_mqtt', ); # connect to mqtt server my $res = $mqtta->connect(); die "Failed to connect: $res\n" if ($res); # Subscribe to topic to be forwarded to the JeeLink, multiple topics can be added here $mqtta->subscribe( 'TIME' ); my @res = (); for (;;) { @res = $mqtta->receivePub(); my $obj = from_json($res[1]); my $topic = $res[0]; my $msg = undef; if ( $topic eq 'TIME' ) { # pack the data into binary my $binary = pack("C C C",$obj->{hour},$obj->{minute},$obj->{second}); # convert binary into a command to the Jeelink $msg = join(',',unpack("C*",$binary)) . ',0s'; } # encodes for other topics can be added here # push the message to be sent to the JeeLink on the queue if ( $msg ) { lock(@queue); push(@queue,$msg); } } } # the thread that talks to the JeeLink sub serial { # creat an MQTT server reference my $mqtt = new WebSphere::MQTT::Client( Hostname => $mqtt_hostname, Port => $mqtt_port, clientid => 'rf12_sender_mqtt', ); # connect to MQTT server my $res = $mqtt->connect(); die "Failed to connect: $res\n" if ($res); # A ping is sent to the MQTT server each five seconds. $ts is used for keeping track of that my $ts = time(); # Set up the serial port my $port = Device::SerialPort->new($jeelink_port); $port->databits(8); $port->baudrate(57600); $port->parity("none"); $port->stopbits(1); # start endless loop for(;;) { # look for data on the serial port my $recv = $port->lookfor(); if ( $recv ) { # we're only interested in good messages if ( $recv =~ m/^OK / ) { my @words = split(' ',$recv); # create an empty packet my $obj = {}; shift(@words); # shift OK $obj->{id} = shift(@words) & 0x1F; # shift and maskthe node id my $binary = pack("C*",@words); my $mqtt_topic = undef; # decode a message received from the energy meter if ( $obj->{id} == 0x05 ) { ($obj->{realpower},$obj->{apparentpower},$obj->{powerfactor},$obj->{vrms},$obj->{irms},$obj->{netfrequency}) = unpack("f f f f f f",$binary); $mqtt_topic = 'POWERMETER'; } # decode a message received from a room node elsif ( $obj->{id} == 0x03 ) { ($obj->{light},$obj->{humidity},$obj->{temperature}) = unpack("C C s",$binary); $obj->{moved} = $obj->{humidity} & 1; $obj->{humidity} = $obj->{humidity} >> 1; $obj->{temperature} = $obj->{temperature} & 0x03FF; $mqtt_topic = 'ROOMNODE'; } # send the message as mqtt message encode in JSON if ( $mqtt_topic ) { my $result = $mqtt->publish(to_json($obj),$mqtt_topic); # record the time we sent the last message $ts = time(); } } } # if no message was sent each 5 seconds, just send a status message to keep us connected to the server if ( time() - $ts > 5 ) { $mqtt->status(); $ts = time(); } # check for new packets on the queue { my $msg = undef; { lock(@queue); if ( scalar(@queue) > 0 ) { $msg = shift(@queue); } } # write the message to the serial port if ( $msg ) { $port->write($msg . "\n"); } } } } # start both threads my $thr1 = threads->new(\&mqtt); my $thr2 = threads->new(\&serial); $thr1->join; $thr2->join;