Files
MSE-PI-E2EEDA-Plein-de-eeee…/gateway/gateway.py
DjeAvd 8ac5f955e0 feat(gateway): add MQTTS support with TLS and authentication
- Add TLS support via mqtt.Client.tls_set()
- Add username/password authentication
- Password loaded from MQTT_PASSWORD environment variable
- Username and TLS flag read from config.json

Assisted-by: Claude:claude-sonnet-4-6 — guidance on paho-mqtt TLS API
and environment variable pattern for secret management
2026-06-04 12:32:46 +02:00

153 lines
5.3 KiB
Python

import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from bleak import BleakScanner
import paho.mqtt.client as mqtt
# Logging — use DEBUG for development, INFO for normal operation
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S"
)
log = logging.getLogger(__name__)
class Gateway:
"""BLE advertising listener and MQTT publisher for Nordic Thingy:52 nodes."""
# Advertising payload keys as defined in the firmware specification
KEY_WINDOW = 0x01
KEY_HUMIDITY = 0x02
KEY_TEMP = 0x03
KEY_CO2 = 0x04
def __init__(self, config: dict):
self.gateway_id = config["gateway_id"]
self.mqtt_broker = config["mqtt"]["broker"]
self.mqtt_port = config["mqtt"]["port"]
# BLE service UUID used to identify Thingy:52 advertising packets
self.service_uuid = config["ble"]["service_uuid"]
log.info(f"Gateway ID : {self.gateway_id}")
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
# MQTT client setup
self.mqttc = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
# Authentication — username from config, password from environment variable
username = config["mqtt"].get("username")
password = os.environ.get("MQTT_PASSWORD")
if username:
self.mqttc.username_pw_set(username, password)
log.info(f"MQTT authentication configured for user: {username}")
# TLS — enabled if specified in config
# Required for MQTTS connections (port 8883)
if config["mqtt"].get("tls", False):
self.mqttc.tls_set()
log.info("TLS enabled")
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
self.mqttc.loop_start()
log.info("MQTT client connected")
def decode_payload(self, data: bytes) -> dict:
"""Decode key/value pairs from BLE advertising payload.
Format per firmware spec:
0x01 : window open (1 byte, 0 or 1)
0x02 : humidity (1 byte, integer %)
0x03 : temperature (2 bytes, integer / 10 = degrees C)
0x04 : CO2 ppm (4 bytes, integer)
"""
result = {}
i = 0
while i < len(data):
key = data[i]
i += 1
if key == self.KEY_WINDOW and i < len(data):
result["window_open"] = bool(data[i])
i += 1
elif key == self.KEY_HUMIDITY and i < len(data):
result["humidity"] = data[i]
i += 1
elif key == self.KEY_TEMP and i + 1 < len(data):
raw = int.from_bytes(data[i:i+2], byteorder='little')
result["temp"] = raw / 10
i += 2
elif key == self.KEY_CO2 and i + 3 < len(data):
result["co2_ppm"] = int.from_bytes(data[i:i+4], byteorder='little')
i += 4
else:
log.warning(f"Unknown key 0x{key:02x} at offset {i-1}")
break
return result
def publish(self, mac: str, data: dict):
"""Publish decoded sensor data to MQTT broker.
Topic: {gateway_id}/{thingy_mac}/update
"""
topic = f"{self.gateway_id}/{mac}/update"
payload = {
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
if "temp" in data:
payload["temp"] = data["temp"]
if "humidity" in data:
payload["humidity"] = data["humidity"]
if "co2_ppm" in data:
payload["co2_ppm"] = data["co2_ppm"]
if "window_open" in data:
payload["window_open"] = data["window_open"]
self.mqttc.publish(topic, json.dumps(payload))
log.info(f"Published to {topic} : {payload}")
def on_device_found(self, device, adv_data):
"""BLE scan callback — filters on Thingy:52 service UUID and decodes payload."""
uuids = [str(u).lower() for u in adv_data.service_uuids]
if self.service_uuid.lower() not in uuids:
return
# Try manufacturer data first, then service data
raw = None
if adv_data.manufacturer_data:
raw = list(adv_data.manufacturer_data.values())[0]
elif adv_data.service_data:
raw = list(adv_data.service_data.values())[0]
if raw is None:
log.debug(f"{device.address} | no payload data found")
return
data = self.decode_payload(raw)
if not data:
log.warning(f"{device.address} | empty decoded payload")
return
log.debug(f"{device.address} | decoded: {data}")
self.publish(device.address, data)
async def run(self):
"""Start passive BLE scan and run indefinitely."""
log.info("BLE scan started, listening for Thingy:52 advertising packets...")
scanner = BleakScanner(detection_callback=self.on_device_found)
await scanner.start()
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
config_path = os.path.join(os.path.dirname(__file__), "config.json")
with open(config_path, "r") as f:
config = json.load(f)
gateway = Gateway(config)
asyncio.run(gateway.run())