Files
MSE-PI-E2EEDA-Plein-de-eeee…/gateway/gateway.py
DjeAvd 2a7546fe8b feat(gateway): implement BLE-to-MQTT gateway
Implement Gateway class that discovers Nordic Thingy:52 nodes via BLE
and publishes sensor data to MQTT broker on each notification received.

- Automatic node discovery via BLE service UUID (ef680100)
- GATT notifications for temperature, humidity and CO2
- Publish immediately on reception to {gateway_id}/{mac}/update
- Connection timeout to avoid blocking on unreachable nodes
- Disconnection detection and automatic reintegration into scan
- Logging with DEBUG/INFO/WARNING/ERROR levels

Assisted-by: Claude:claude-sonnet-4-6 — debugging BLE parallel connections (BlueZ InProgress error), GATT UUID discovery (ef680100 vs ef680200), byte decoding for temperature/humidity/CO2, async connection timeout implementation
2026-06-04 12:32:45 +02:00

161 lines
6.3 KiB
Python

import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from bleak import BleakClient, BleakScanner
import paho.mqtt.client as mqtt
# Logging — use DEBUG for development, INFO for normal operation
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%dT%H:%M:%S"
)
log = logging.getLogger(__name__)
class Gateway:
"""BLE to MQTT gateway for Nordic Thingy:52 sensor nodes."""
def __init__(self, config: dict):
self.gateway_id = config["gateway_id"]
self.mqtt_broker = config["mqtt"]["broker"]
self.mqtt_port = config["mqtt"]["port"]
# GATT UUIDs loaded from config
# ef680100 is advertised in BLE packets — used for discovery
# environment service (ef680200) is only accessible after connection
self.service_uuid = config["ble"]["service_uuid"]
self.uuid_temp = config["ble"]["characteristics"]["temperature"]
self.uuid_co2 = config["ble"]["characteristics"]["co2"]
self.uuid_humidity= config["ble"]["characteristics"]["humidity"]
# Runtime state
# latest : last known sensor values per MAC address
# connecting: MACs currently being connected (avoids duplicate attempts)
self.latest = {}
self.connecting = set()
self.scanner = None
# MQTT client setup
self.mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
self.mqttc.loop_start()
log.info(f"Gateway ID : {self.gateway_id}")
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
log.info("MQTT client connected")
# Publish immediately on reception — topic: {gateway_id}/{mac}/update
# Only include fields that have been received at least once
def publish(self, mac: str, data: dict):
topic = f"{self.gateway_id}/{mac}/update"
payload = {"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")}
if data.get("temp") is not None:
payload["temp"] = data.get("temp")
if data.get("humidity") is not None:
payload["humidity"] = data.get("humidity")
if data.get("co2") is not None:
payload["co2_ppm"] = data.get("co2")
self.mqttc.publish(topic, json.dumps(payload))
log.info(f"Published to {topic} : {payload}")
# BLE handlers — called by bleak on each notification
# Temp : 2 bytes (integer part + decimal part)
# CO2 : uint16 little-endian, 0 means sensor is warming up
# Humidity: 1 byte, direct percentage value
def handle_temp(self, mac: str, sender, data: bytearray):
temp = data[0] + data[1] / 100
if mac in self.latest:
self.latest[mac]["temp"] = round(temp, 2)
self.publish(mac, self.latest[mac])
def handle_humidity(self, mac: str, sender, data: bytearray):
if mac in self.latest:
self.latest[mac]["humidity"] = data[0]
self.publish(mac, self.latest[mac])
def handle_co2(self, mac: str, sender, data: bytearray):
co2 = int.from_bytes(data[0:2], byteorder='little')
if co2 == 0:
log.warning(f"{mac} | CO2 sensor still warming up")
return
if mac in self.latest:
self.latest[mac]["co2"] = co2
self.publish(mac, self.latest[mac])
# Stop scanner before connecting — BlueZ does not support both simultaneously.
# Scanner is restarted once the device is connected and notifications registered.
async def connect_device(self, mac: str):
log.info(f"Attempting to connect to {mac}...")
try:
if self.scanner:
await self.scanner.stop()
await asyncio.sleep(1)
log.info(f"Connecting to {mac}...")
def on_disconnect(client):
log.warning(f"{mac} disconnected, removing from active nodes")
self.latest.pop(mac, None)
client = BleakClient(mac, disconnected_callback=on_disconnect)
try:
await asyncio.wait_for(client.connect(), timeout=10.0)
except asyncio.TimeoutError:
log.error(f"Connection timeout for {mac}")
self.connecting.discard(mac)
self.latest.pop(mac, None)
if self.scanner:
await self.scanner.start()
return
self.connecting.discard(mac)
self.latest[mac] = {"temp": None, "humidity": None, "co2": None}
log.info(f"{mac} connected")
await client.start_notify(self.uuid_temp,
lambda s, d: self.handle_temp(mac, s, d))
await client.start_notify(self.uuid_co2,
lambda s, d: self.handle_co2(mac, s, d))
await client.start_notify(self.uuid_humidity,
lambda s, d: self.handle_humidity(mac, s, d))
await asyncio.sleep(1)
await self.scanner.start()
log.info("Scanner restarted, waiting for additional nodes")
except Exception as e:
log.error(f"Connection error for {mac}: {e}")
self.connecting.discard(mac)
self.latest.pop(mac, None)
if self.scanner:
await self.scanner.start()
# BLE discovery callback — filters on Thingy:52 service UUID
def on_device_found(self, device, adv_data):
uuids = [str(u).lower() for u in adv_data.service_uuids]
if self.service_uuid.lower() in uuids \
and device.address not in self.latest \
and device.address not in self.connecting:
self.connecting.add(device.address)
log.info(f"New node detected: {device.address}")
asyncio.ensure_future(self.connect_device(device.address))
# Start BLE scan and run indefinitely
async def run(self):
log.info("BLE scan started, waiting for Thingy:52 nodes...")
self.scanner = BleakScanner(detection_callback=self.on_device_found)
await self.scanner.start()
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
config_path = os.path.join(os.path.dirname(__file__), "config.json")
with open(config_path, "r") as f:
config = json.load(f)
gateway = Gateway(config)
asyncio.run(gateway.run())