Files
MSE-PI-E2EEDA-Plein-de-eeee…/gateway/gateway.py
DjeAvd b3e6b7e1b8 feat(gateway): add battery level support (key 0x05)
- Add KEY_BATTERY constant (0x05)
- Update EXPECTED_PAYLOAD_SIZE from 12 to 14 bytes
- Decode battery level from payload
- Publish battery field in MQTT JSON payload
2026-06-04 12:32:47 +02:00

192 lines
7.2 KiB
Python

import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from bleak import BleakScanner
import paho.mqtt.client as mqtt
# Logging — use DEBUG for development, INFO for normal operation
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S"
)
log = logging.getLogger(__name__)
class Gateway:
"""BLE advertising listener and MQTT publisher for Nordic Thingy:52 nodes."""
# Advertising payload keys as defined in the firmware specification
KEY_WINDOW = 0x01
KEY_HUMIDITY = 0x02
KEY_TEMP = 0x03
KEY_CO2 = 0x04
KEY_BATTERY = 0x05 # Battery level (1 byte, integer %)
# Sentinel value indicating sensor failure or not ready
INVALID_VALUE = 0xFFFFFFFF
# Expected payload size in bytes:
# 5 keys (1B each) + window(1B) + humidity(1B) + temp(2B) + co2(4B) + battery(1B) = 14 bytes
EXPECTED_PAYLOAD_SIZE = 14
def __init__(self, config: dict):
self.gateway_id = config["gateway_id"]
self.mqtt_broker = config["mqtt"]["broker"]
self.mqtt_port = config["mqtt"]["port"]
# BLE service UUID used to identify Thingy:52 advertising packets
self.service_uuid = config["ble"]["service_uuid"]
log.info(f"Gateway ID : {self.gateway_id}")
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
# MQTT client setup
self.mqttc = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
# Authentication — username from config, password from environment variable
# The password is never stored in config files or source code
username = config["mqtt"].get("username")
password = os.environ.get("MQTT_PASSWORD")
if username:
self.mqttc.username_pw_set(username, password)
log.info(f"MQTT authentication configured for user: {username}")
# TLS — enabled if specified in config
# Required for MQTTS connections (port 8883)
if config["mqtt"].get("tls", False):
self.mqttc.tls_set()
log.info("TLS enabled")
# Callback to confirm connection to broker
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
log.info("Successfully connected to MQTT broker")
else:
log.error(f"Failed to connect to MQTT broker — reason code: {reason_code}")
# Callback to confirm message delivery to broker
def on_publish(client, userdata, mid, reason_code, properties):
log.info(f"Message confirmed by broker — mid: {mid}")
self.mqttc.on_connect = on_connect
self.mqttc.on_publish = on_publish
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
self.mqttc.loop_start()
log.info("MQTT client connecting...")
def decode_payload(self, data: bytes) -> dict:
"""Decode key/value pairs from BLE advertising payload.
Format per firmware spec (no preamble in raw bytes — company_id
is handled by the BLE stack and not included in manufacturer data):
0x01 : window open (1 byte, 0 or 1)
0x02 : humidity (1 byte, integer %)
0x03 : temperature (2 bytes big-endian, integer / 10 = degrees C)
0x04 : CO2 ppm (4 bytes big-endian, integer)
0x05 : battery level (1 byte, integer %)
Values equal to 0xFFFFFFFF indicate sensor failure or not ready
and are discarded.
"""
result = {}
i = 0 # no preamble to skip — company_id is not in raw bytes
while i < len(data):
key = data[i]
i += 1
if key == self.KEY_WINDOW and i < len(data):
result["window_open"] = bool(data[i])
i += 1
elif key == self.KEY_HUMIDITY and i < len(data):
result["humidity"] = data[i]
i += 1
elif key == self.KEY_TEMP and i + 1 < len(data):
# Temperature stored as integer * 10, big-endian
# Example: 25.3°C is stored as 253 (0x00FD)
raw = int.from_bytes(data[i:i+2], byteorder='big')
result["temp"] = raw / 10
i += 2
elif key == self.KEY_CO2 and i + 3 < len(data):
co2 = int.from_bytes(data[i:i+4], byteorder='big')
# 0xFFFFFFFF indicates sensor not ready or failed
if co2 != self.INVALID_VALUE:
result["co2_ppm"] = co2
else:
log.debug(f"CO2 sensor not ready — discarding value 0xFFFFFFFF")
i += 4
elif key == self.KEY_BATTERY and i < len(data):
result["battery"] = data[i]
i += 1
else:
# Unknown key — likely a non-Thingy device, ignore silently
log.debug(f"Unknown key 0x{key:02x} at offset {i-1}")
break
return result
def publish(self, mac: str, data: dict):
"""Publish decoded sensor data to MQTT broker.
Topic: {gateway_id}/{thingy_mac}/update
"""
topic = f"{self.gateway_id}/{mac}/update"
payload = {
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
}
if "temp" in data:
payload["temp"] = data["temp"]
if "humidity" in data:
payload["humidity"] = data["humidity"]
if "co2_ppm" in data:
payload["co2_ppm"] = data["co2_ppm"]
if "window_open" in data:
payload["window_open"] = data["window_open"]
if "battery" in data:
payload["battery"] = data["battery"]
self.mqttc.publish(topic, json.dumps(payload))
log.info(f"Published to {topic} : {payload}")
def on_device_found(self, device, adv_data):
"""BLE scan callback — filters on company_id 0xffff in manufacturer data."""
if 0xffff not in adv_data.manufacturer_data:
return
raw = adv_data.manufacturer_data[0xffff]
# Filter on exact payload size to avoid false positives from
# other BLE devices using the same company_id
if len(raw) != self.EXPECTED_PAYLOAD_SIZE:
log.debug(f"{device.address} | ignored — unexpected payload size: {len(raw)}")
return
log.debug(f"{device.address} | Thingy detected, raw: {list(raw)}")
data = self.decode_payload(raw)
if not data:
log.debug(f"{device.address} | empty decoded payload — ignored")
return
log.debug(f"{device.address} | decoded: {data}")
self.publish(device.address, data)
async def run(self):
"""Start passive BLE scan and run indefinitely."""
log.info("BLE scan started, listening for Thingy:52 advertising packets...")
scanner = BleakScanner(detection_callback=self.on_device_found)
await scanner.start()
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
config_path = os.path.join(os.path.dirname(__file__), "config.json")
with open(config_path, "r") as f:
config = json.load(f)
gateway = Gateway(config)
asyncio.run(gateway.run())