refactor(gateway): switch from GATT connection to passive BLE advertising
Replace active GATT connection with passive BLE advertising scan. The gateway now listens to advertising packets and decodes the key/value payload defined in the firmware specification (keys 0x01 to 0x04). - Remove GATT connection logic (connect_device, BleakClient) - Add decode_payload method following firmware spec - Fix mqtt.Client instantiation with callback_api_version keyword argument - Add window_open field support in MQTT payload Assisted-by: Claude:claude-sonnet-4-6 — payload decoding implementation
This commit is contained in:
@@ -3,7 +3,7 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from bleak import BleakClient, BleakScanner
|
from bleak import BleakScanner
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
# Logging — use DEBUG for development, INFO for normal operation
|
# Logging — use DEBUG for development, INFO for normal operation
|
||||||
@@ -16,137 +16,115 @@ log = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
class Gateway:
|
class Gateway:
|
||||||
"""BLE to MQTT gateway for Nordic Thingy:52 sensor nodes."""
|
"""BLE advertising listener and MQTT publisher for Nordic Thingy:52 nodes."""
|
||||||
|
|
||||||
|
# Advertising payload keys as defined in the firmware specification
|
||||||
|
KEY_WINDOW = 0x01
|
||||||
|
KEY_HUMIDITY = 0x02
|
||||||
|
KEY_TEMP = 0x03
|
||||||
|
KEY_CO2 = 0x04
|
||||||
|
|
||||||
def __init__(self, config: dict):
|
def __init__(self, config: dict):
|
||||||
self.gateway_id = config["gateway_id"]
|
self.gateway_id = config["gateway_id"]
|
||||||
self.mqtt_broker = config["mqtt"]["broker"]
|
self.mqtt_broker = config["mqtt"]["broker"]
|
||||||
self.mqtt_port = config["mqtt"]["port"]
|
self.mqtt_port = config["mqtt"]["port"]
|
||||||
|
|
||||||
# GATT UUIDs loaded from config
|
# BLE service UUID used to identify Thingy:52 advertising packets
|
||||||
# ef680100 is advertised in BLE packets — used for discovery
|
|
||||||
# environment service (ef680200) is only accessible after connection
|
|
||||||
self.service_uuid = config["ble"]["service_uuid"]
|
self.service_uuid = config["ble"]["service_uuid"]
|
||||||
self.uuid_temp = config["ble"]["characteristics"]["temperature"]
|
|
||||||
self.uuid_co2 = config["ble"]["characteristics"]["co2"]
|
|
||||||
self.uuid_humidity= config["ble"]["characteristics"]["humidity"]
|
|
||||||
|
|
||||||
# Runtime state
|
|
||||||
# latest : last known sensor values per MAC address
|
|
||||||
# connecting: MACs currently being connected (avoids duplicate attempts)
|
|
||||||
self.latest = {}
|
|
||||||
self.connecting = set()
|
|
||||||
self.scanner = None
|
|
||||||
|
|
||||||
# MQTT client setup
|
|
||||||
self.mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
||||||
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
|
|
||||||
self.mqttc.loop_start()
|
|
||||||
|
|
||||||
log.info(f"Gateway ID : {self.gateway_id}")
|
log.info(f"Gateway ID : {self.gateway_id}")
|
||||||
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
|
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
|
||||||
|
|
||||||
|
# MQTT client setup
|
||||||
|
self.mqttc = mqtt.Client(
|
||||||
|
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
|
||||||
|
)
|
||||||
|
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
|
||||||
|
self.mqttc.loop_start()
|
||||||
log.info("MQTT client connected")
|
log.info("MQTT client connected")
|
||||||
|
|
||||||
# Publish immediately on reception — topic: {gateway_id}/{mac}/update
|
def decode_payload(self, data: bytes) -> dict:
|
||||||
# Only include fields that have been received at least once
|
"""Decode key/value pairs from BLE advertising payload.
|
||||||
|
|
||||||
|
Format per firmware spec:
|
||||||
|
0x01 : window open (1 byte, 0 or 1)
|
||||||
|
0x02 : humidity (1 byte, integer %)
|
||||||
|
0x03 : temperature (2 bytes, integer / 10 = degrees C)
|
||||||
|
0x04 : CO2 ppm (4 bytes, integer)
|
||||||
|
"""
|
||||||
|
result = {}
|
||||||
|
i = 0
|
||||||
|
while i < len(data):
|
||||||
|
key = data[i]
|
||||||
|
i += 1
|
||||||
|
if key == self.KEY_WINDOW and i < len(data):
|
||||||
|
result["window_open"] = bool(data[i])
|
||||||
|
i += 1
|
||||||
|
elif key == self.KEY_HUMIDITY and i < len(data):
|
||||||
|
result["humidity"] = data[i]
|
||||||
|
i += 1
|
||||||
|
elif key == self.KEY_TEMP and i + 1 < len(data):
|
||||||
|
raw = int.from_bytes(data[i:i+2], byteorder='little')
|
||||||
|
result["temp"] = raw / 10
|
||||||
|
i += 2
|
||||||
|
elif key == self.KEY_CO2 and i + 3 < len(data):
|
||||||
|
result["co2_ppm"] = int.from_bytes(data[i:i+4], byteorder='little')
|
||||||
|
i += 4
|
||||||
|
else:
|
||||||
|
log.warning(f"Unknown key 0x{key:02x} at offset {i-1}")
|
||||||
|
break
|
||||||
|
return result
|
||||||
|
|
||||||
def publish(self, mac: str, data: dict):
|
def publish(self, mac: str, data: dict):
|
||||||
|
"""Publish decoded sensor data to MQTT broker.
|
||||||
|
Topic: {gateway_id}/{thingy_mac}/update
|
||||||
|
"""
|
||||||
topic = f"{self.gateway_id}/{mac}/update"
|
topic = f"{self.gateway_id}/{mac}/update"
|
||||||
payload = {"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")}
|
payload = {
|
||||||
if data.get("temp") is not None:
|
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||||
payload["temp"] = data.get("temp")
|
}
|
||||||
if data.get("humidity") is not None:
|
if "temp" in data:
|
||||||
payload["humidity"] = data.get("humidity")
|
payload["temp"] = data["temp"]
|
||||||
if data.get("co2") is not None:
|
if "humidity" in data:
|
||||||
payload["co2_ppm"] = data.get("co2")
|
payload["humidity"] = data["humidity"]
|
||||||
|
if "co2_ppm" in data:
|
||||||
|
payload["co2_ppm"] = data["co2_ppm"]
|
||||||
|
if "window_open" in data:
|
||||||
|
payload["window_open"] = data["window_open"]
|
||||||
|
|
||||||
self.mqttc.publish(topic, json.dumps(payload))
|
self.mqttc.publish(topic, json.dumps(payload))
|
||||||
log.info(f"Published to {topic} : {payload}")
|
log.info(f"Published to {topic} : {payload}")
|
||||||
|
|
||||||
# BLE handlers — called by bleak on each notification
|
|
||||||
# Temp : 2 bytes (integer part + decimal part)
|
|
||||||
# CO2 : uint16 little-endian, 0 means sensor is warming up
|
|
||||||
# Humidity: 1 byte, direct percentage value
|
|
||||||
def handle_temp(self, mac: str, sender, data: bytearray):
|
|
||||||
temp = data[0] + data[1] / 100
|
|
||||||
if mac in self.latest:
|
|
||||||
self.latest[mac]["temp"] = round(temp, 2)
|
|
||||||
self.publish(mac, self.latest[mac])
|
|
||||||
|
|
||||||
def handle_humidity(self, mac: str, sender, data: bytearray):
|
|
||||||
if mac in self.latest:
|
|
||||||
self.latest[mac]["humidity"] = data[0]
|
|
||||||
self.publish(mac, self.latest[mac])
|
|
||||||
|
|
||||||
def handle_co2(self, mac: str, sender, data: bytearray):
|
|
||||||
co2 = int.from_bytes(data[0:2], byteorder='little')
|
|
||||||
if co2 == 0:
|
|
||||||
log.warning(f"{mac} | CO2 sensor still warming up")
|
|
||||||
return
|
|
||||||
if mac in self.latest:
|
|
||||||
self.latest[mac]["co2"] = co2
|
|
||||||
self.publish(mac, self.latest[mac])
|
|
||||||
|
|
||||||
# Stop scanner before connecting — BlueZ does not support both simultaneously.
|
|
||||||
# Scanner is restarted once the device is connected and notifications registered.
|
|
||||||
async def connect_device(self, mac: str):
|
|
||||||
log.info(f"Attempting to connect to {mac}...")
|
|
||||||
try:
|
|
||||||
if self.scanner:
|
|
||||||
await self.scanner.stop()
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
log.info(f"Connecting to {mac}...")
|
|
||||||
|
|
||||||
def on_disconnect(client):
|
|
||||||
log.warning(f"{mac} disconnected, removing from active nodes")
|
|
||||||
self.latest.pop(mac, None)
|
|
||||||
|
|
||||||
client = BleakClient(mac, disconnected_callback=on_disconnect)
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(client.connect(), timeout=10.0)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
log.error(f"Connection timeout for {mac}")
|
|
||||||
self.connecting.discard(mac)
|
|
||||||
self.latest.pop(mac, None)
|
|
||||||
if self.scanner:
|
|
||||||
await self.scanner.start()
|
|
||||||
return
|
|
||||||
|
|
||||||
self.connecting.discard(mac)
|
|
||||||
self.latest[mac] = {"temp": None, "humidity": None, "co2": None}
|
|
||||||
log.info(f"{mac} connected")
|
|
||||||
|
|
||||||
await client.start_notify(self.uuid_temp,
|
|
||||||
lambda s, d: self.handle_temp(mac, s, d))
|
|
||||||
await client.start_notify(self.uuid_co2,
|
|
||||||
lambda s, d: self.handle_co2(mac, s, d))
|
|
||||||
await client.start_notify(self.uuid_humidity,
|
|
||||||
lambda s, d: self.handle_humidity(mac, s, d))
|
|
||||||
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
await self.scanner.start()
|
|
||||||
log.info("Scanner restarted, waiting for additional nodes")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
log.error(f"Connection error for {mac}: {e}")
|
|
||||||
self.connecting.discard(mac)
|
|
||||||
self.latest.pop(mac, None)
|
|
||||||
if self.scanner:
|
|
||||||
await self.scanner.start()
|
|
||||||
|
|
||||||
# BLE discovery callback — filters on Thingy:52 service UUID
|
|
||||||
def on_device_found(self, device, adv_data):
|
def on_device_found(self, device, adv_data):
|
||||||
|
"""BLE scan callback — filters on Thingy:52 service UUID and decodes payload."""
|
||||||
uuids = [str(u).lower() for u in adv_data.service_uuids]
|
uuids = [str(u).lower() for u in adv_data.service_uuids]
|
||||||
if self.service_uuid.lower() in uuids \
|
if self.service_uuid.lower() not in uuids:
|
||||||
and device.address not in self.latest \
|
return
|
||||||
and device.address not in self.connecting:
|
|
||||||
self.connecting.add(device.address)
|
# Try manufacturer data first, then service data
|
||||||
log.info(f"New node detected: {device.address}")
|
raw = None
|
||||||
asyncio.ensure_future(self.connect_device(device.address))
|
if adv_data.manufacturer_data:
|
||||||
|
raw = list(adv_data.manufacturer_data.values())[0]
|
||||||
|
elif adv_data.service_data:
|
||||||
|
raw = list(adv_data.service_data.values())[0]
|
||||||
|
|
||||||
|
if raw is None:
|
||||||
|
log.debug(f"{device.address} | no payload data found")
|
||||||
|
return
|
||||||
|
|
||||||
|
data = self.decode_payload(raw)
|
||||||
|
if not data:
|
||||||
|
log.warning(f"{device.address} | empty decoded payload")
|
||||||
|
return
|
||||||
|
|
||||||
|
log.debug(f"{device.address} | decoded: {data}")
|
||||||
|
self.publish(device.address, data)
|
||||||
|
|
||||||
# Start BLE scan and run indefinitely
|
|
||||||
async def run(self):
|
async def run(self):
|
||||||
log.info("BLE scan started, waiting for Thingy:52 nodes...")
|
"""Start passive BLE scan and run indefinitely."""
|
||||||
self.scanner = BleakScanner(detection_callback=self.on_device_found)
|
log.info("BLE scan started, listening for Thingy:52 advertising packets...")
|
||||||
await self.scanner.start()
|
scanner = BleakScanner(detection_callback=self.on_device_found)
|
||||||
|
await scanner.start()
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user