Files
MSE-PI-E2EEDA-Plein-de-eeee…/gateway/gateway.py
DjeAvd 4de21a08f8 fix(gateway): use os._exit instead of SystemExit for MQTT error handling
SystemExit only stops the paho-mqtt background thread, leaving the main
asyncio loop running silently. os._exit kills the entire process so
systemd can restart the gateway automatically.
2026-06-04 12:32:48 +02:00

257 lines
10 KiB
Python

import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from bleak import BleakScanner
import paho.mqtt.client as mqtt
# Logging — use DEBUG for development, INFO for normal operation
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S"
)
log = logging.getLogger(__name__)
class Gateway:
"""BLE advertising listener and MQTT publisher for Nordic Thingy:52 nodes."""
# Advertising payload keys as defined in the firmware specification
KEY_WINDOW = 0x01
KEY_HUMIDITY = 0x02
KEY_TEMP = 0x03
KEY_CO2 = 0x04
KEY_BATTERY = 0x05 # Battery level (1 byte, integer %)
# Sentinel value indicating sensor failure or not ready
INVALID_VALUE = 0xFFFFFFFF
# Expected payload size in bytes:
# 5 keys (1B each) + window(1B) + humidity(1B) + temp(2B) + co2(4B) + battery(1B) = 14 bytes
EXPECTED_PAYLOAD_SIZE = 14
# Deduplication window in seconds — ignore packets from the same node
# within this period to avoid duplicate entries in the database.
# Each node advertises 3-4 times per frame on BLE channels 37, 38 and 39.
DEDUP_WINDOW_SECONDS = 10
def __init__(self, config: dict):
self.gateway_id = config["gateway_id"]
self.mqtt_broker = config["mqtt"]["broker"]
self.mqtt_port = config["mqtt"]["port"]
# BLE service UUID used to identify Thingy:52 advertising packets
self.service_uuid = config["ble"]["service_uuid"]
# Deduplication cache — stores the last time a packet was published
# for each MAC address. Packets received within DEDUP_WINDOW_SECONDS
# from the same node are ignored.
self._last_published = {}
log.info(f"Gateway ID : {self.gateway_id}")
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
# MQTT client setup
self.mqttc = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
# Authentication — username from config, password from environment variable
# The password is never stored in config files or source code
username = config["mqtt"].get("username")
password = os.environ.get("MQTT_PASSWORD")
if username:
self.mqttc.username_pw_set(username, password)
log.info(f"MQTT authentication configured for user: {username}")
# TLS — enabled if specified in config
# Required for MQTTS connections (port 8883)
if config["mqtt"].get("tls", False):
self.mqttc.tls_set()
log.info("TLS enabled")
# Callback to confirm connection to broker
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
log.info("Successfully connected to MQTT broker")
else:
log.error(f"Failed to connect to MQTT broker — reason code: {reason_code}")
os._exit(1)
# Callback triggered on disconnection — kill the process so systemd
# restarts the gateway automatically after RestartSec=10 seconds.
# os._exit(1) is used instead of SystemExit because on_disconnect runs
# in the paho-mqtt background thread — SystemExit would only stop that
# thread, leaving the main asyncio loop running silently.
def on_disconnect(client, userdata, flags, reason_code, properties):
log.error(f"Disconnected from MQTT broker — reason code: {reason_code}")
log.error("Exiting — systemd will restart the gateway")
os._exit(1)
# Callback to confirm message delivery to broker
def on_publish(client, userdata, mid, reason_code, properties):
log.info(f"Message confirmed by broker — mid: {mid}")
self.mqttc.on_connect = on_connect
self.mqttc.on_disconnect = on_disconnect
self.mqttc.on_publish = on_publish
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
self.mqttc.loop_start()
log.info("MQTT client connecting...")
def decode_payload(self, data: bytes) -> dict:
"""Decode key/value pairs from BLE advertising payload.
Format per firmware spec (no preamble in raw bytes — company_id
is handled by the BLE stack and not included in manufacturer data):
0x01 : window open (1 byte, 0 or 1)
0x02 : humidity (1 byte, integer %)
0x03 : temperature (2 bytes big-endian, integer / 10 = degrees C)
0x04 : CO2 ppm (4 bytes big-endian, integer)
0x05 : battery level (1 byte, integer %)
Values equal to 0xFFFFFFFF indicate sensor failure or not ready
and are discarded.
"""
result = {}
i = 0 # no preamble to skip — company_id is not in raw bytes
while i < len(data):
key = data[i]
i += 1
if key == self.KEY_WINDOW and i < len(data):
result["window_open"] = bool(data[i])
i += 1
elif key == self.KEY_HUMIDITY and i < len(data):
result["humidity"] = data[i]
i += 1
elif key == self.KEY_TEMP and i + 1 < len(data):
# Temperature stored as integer * 10, big-endian
# Example: 25.3°C is stored as 253 (0x00FD)
raw = int.from_bytes(data[i:i+2], byteorder='big')
result["temp"] = raw / 10
i += 2
elif key == self.KEY_CO2 and i + 3 < len(data):
co2 = int.from_bytes(data[i:i+4], byteorder='big')
# 0xFFFFFFFF indicates sensor not ready or failed
if co2 != self.INVALID_VALUE:
result["co2_ppm"] = co2
else:
log.debug(f"CO2 sensor not ready — discarding value 0xFFFFFFFF")
i += 4
elif key == self.KEY_BATTERY and i < len(data):
result["battery"] = data[i]
i += 1
else:
# Unknown key — likely a non-Thingy device, ignore silently
log.debug(f"Unknown key 0x{key:02x} at offset {i-1}")
break
return result
def is_duplicate(self, mac: str) -> bool:
"""Check if a packet from this MAC was already published recently.
Each Thingy:52 advertises 3-4 times per broadcast frame on BLE
advertising channels 37, 38 and 39. Without deduplication, the same
measurement would be published multiple times to the broker.
This method ignores packets from the same node received within
DEDUP_WINDOW_SECONDS of the last published packet.
"""
now = datetime.now(timezone.utc)
last = self._last_published.get(mac)
if last and (now - last).total_seconds() < self.DEDUP_WINDOW_SECONDS:
log.debug(f"{mac} | duplicate ignored — last published {(now - last).total_seconds():.1f}s ago")
return True
self._last_published[mac] = now
return False
def publish(self, mac: str, data: dict):
"""Publish decoded sensor data to MQTT broker.
Topic: {gateway_id}/{thingy_mac}/update
Checks the return code of mqttc.publish() and exits if an error
is detected, so systemd can restart the gateway automatically.
"""
topic = f"{self.gateway_id}/{mac}/update"
payload = {
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
if "temp" in data:
payload["temp"] = data["temp"]
if "humidity" in data:
payload["humidity"] = data["humidity"]
if "co2_ppm" in data:
payload["co2_ppm"] = data["co2_ppm"]
if "window_open" in data:
payload["window_open"] = data["window_open"]
if "battery" in data:
payload["battery"] = data["battery"]
result = self.mqttc.publish(topic, json.dumps(payload))
# Check if publish was successful — rc=0 means success
# Any other value indicates an error (e.g. connection lost)
if result.rc != mqtt.MQTT_ERR_SUCCESS:
log.error(f"Failed to publish to {topic} — rc: {result.rc}")
log.error("Exiting — systemd will restart the gateway")
os._exit(1)
log.info(f"Published to {topic} : {payload}")
def on_device_found(self, device, adv_data):
"""BLE scan callback — filters on company_id 0xffff in manufacturer data."""
if 0xffff not in adv_data.manufacturer_data:
return
raw = adv_data.manufacturer_data[0xffff]
# Filter on exact payload size to avoid false positives from
# other BLE devices using the same company_id
if len(raw) != self.EXPECTED_PAYLOAD_SIZE:
log.debug(f"{device.address} | ignored — unexpected payload size: {len(raw)}")
return
# Deduplication — ignore packets from the same node within 10 seconds
if self.is_duplicate(device.address):
return
log.debug(f"{device.address} | Thingy detected, raw: {list(raw)}")
data = self.decode_payload(raw)
if not data:
log.debug(f"{device.address} | empty decoded payload — ignored")
return
log.debug(f"{device.address} | decoded: {data}")
self.publish(device.address, data)
async def run(self):
"""Start passive BLE scan and run indefinitely.
Scanner is configured with a continuous scan (interval = window = 10ms)
to ensure no advertising packets are missed. By default, BlueZ uses
an interval of 1280ms and a window of 11.25ms — less than 1% duty cycle.
With the Thingy:52 advertising window of only 500ms, packets could be
missed. Setting interval = window guarantees continuous listening on
all three BLE advertising channels (37, 38 and 39).
"""
log.info("BLE scan started, listening for Thingy:52 advertising packets...")
scanner = BleakScanner(
detection_callback=self.on_device_found,
bluez={"interval": 0x0010, "window": 0x0010}
)
await scanner.start()
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
config_path = os.path.join(os.path.dirname(__file__), "config.json")
with open(config_path, "r") as f:
config = json.load(f)
gateway = Gateway(config)
asyncio.run(gateway.run())