SystemExit only stops the paho-mqtt background thread, leaving the main asyncio loop running silently. os._exit kills the entire process so systemd can restart the gateway automatically.
257 lines
10 KiB
Python
257 lines
10 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from bleak import BleakScanner
|
|
import paho.mqtt.client as mqtt
|
|
|
|
# Logging — use DEBUG for development, INFO for normal operation
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
datefmt="%Y-%m-%dT%H:%M:%S"
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class Gateway:
|
|
"""BLE advertising listener and MQTT publisher for Nordic Thingy:52 nodes."""
|
|
|
|
# Advertising payload keys as defined in the firmware specification
|
|
KEY_WINDOW = 0x01
|
|
KEY_HUMIDITY = 0x02
|
|
KEY_TEMP = 0x03
|
|
KEY_CO2 = 0x04
|
|
KEY_BATTERY = 0x05 # Battery level (1 byte, integer %)
|
|
|
|
# Sentinel value indicating sensor failure or not ready
|
|
INVALID_VALUE = 0xFFFFFFFF
|
|
|
|
# Expected payload size in bytes:
|
|
# 5 keys (1B each) + window(1B) + humidity(1B) + temp(2B) + co2(4B) + battery(1B) = 14 bytes
|
|
EXPECTED_PAYLOAD_SIZE = 14
|
|
|
|
# Deduplication window in seconds — ignore packets from the same node
|
|
# within this period to avoid duplicate entries in the database.
|
|
# Each node advertises 3-4 times per frame on BLE channels 37, 38 and 39.
|
|
DEDUP_WINDOW_SECONDS = 10
|
|
|
|
def __init__(self, config: dict):
|
|
self.gateway_id = config["gateway_id"]
|
|
self.mqtt_broker = config["mqtt"]["broker"]
|
|
self.mqtt_port = config["mqtt"]["port"]
|
|
|
|
# BLE service UUID used to identify Thingy:52 advertising packets
|
|
self.service_uuid = config["ble"]["service_uuid"]
|
|
|
|
# Deduplication cache — stores the last time a packet was published
|
|
# for each MAC address. Packets received within DEDUP_WINDOW_SECONDS
|
|
# from the same node are ignored.
|
|
self._last_published = {}
|
|
|
|
log.info(f"Gateway ID : {self.gateway_id}")
|
|
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
|
|
|
|
# MQTT client setup
|
|
self.mqttc = mqtt.Client(
|
|
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
|
|
)
|
|
|
|
# Authentication — username from config, password from environment variable
|
|
# The password is never stored in config files or source code
|
|
username = config["mqtt"].get("username")
|
|
password = os.environ.get("MQTT_PASSWORD")
|
|
if username:
|
|
self.mqttc.username_pw_set(username, password)
|
|
log.info(f"MQTT authentication configured for user: {username}")
|
|
|
|
# TLS — enabled if specified in config
|
|
# Required for MQTTS connections (port 8883)
|
|
if config["mqtt"].get("tls", False):
|
|
self.mqttc.tls_set()
|
|
log.info("TLS enabled")
|
|
|
|
# Callback to confirm connection to broker
|
|
def on_connect(client, userdata, flags, reason_code, properties):
|
|
if reason_code == 0:
|
|
log.info("Successfully connected to MQTT broker")
|
|
else:
|
|
log.error(f"Failed to connect to MQTT broker — reason code: {reason_code}")
|
|
os._exit(1)
|
|
|
|
# Callback triggered on disconnection — kill the process so systemd
|
|
# restarts the gateway automatically after RestartSec=10 seconds.
|
|
# os._exit(1) is used instead of SystemExit because on_disconnect runs
|
|
# in the paho-mqtt background thread — SystemExit would only stop that
|
|
# thread, leaving the main asyncio loop running silently.
|
|
def on_disconnect(client, userdata, flags, reason_code, properties):
|
|
log.error(f"Disconnected from MQTT broker — reason code: {reason_code}")
|
|
log.error("Exiting — systemd will restart the gateway")
|
|
os._exit(1)
|
|
|
|
# Callback to confirm message delivery to broker
|
|
def on_publish(client, userdata, mid, reason_code, properties):
|
|
log.info(f"Message confirmed by broker — mid: {mid}")
|
|
|
|
self.mqttc.on_connect = on_connect
|
|
self.mqttc.on_disconnect = on_disconnect
|
|
self.mqttc.on_publish = on_publish
|
|
|
|
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
|
|
self.mqttc.loop_start()
|
|
log.info("MQTT client connecting...")
|
|
|
|
def decode_payload(self, data: bytes) -> dict:
|
|
"""Decode key/value pairs from BLE advertising payload.
|
|
|
|
Format per firmware spec (no preamble in raw bytes — company_id
|
|
is handled by the BLE stack and not included in manufacturer data):
|
|
0x01 : window open (1 byte, 0 or 1)
|
|
0x02 : humidity (1 byte, integer %)
|
|
0x03 : temperature (2 bytes big-endian, integer / 10 = degrees C)
|
|
0x04 : CO2 ppm (4 bytes big-endian, integer)
|
|
0x05 : battery level (1 byte, integer %)
|
|
|
|
Values equal to 0xFFFFFFFF indicate sensor failure or not ready
|
|
and are discarded.
|
|
"""
|
|
result = {}
|
|
i = 0 # no preamble to skip — company_id is not in raw bytes
|
|
while i < len(data):
|
|
key = data[i]
|
|
i += 1
|
|
if key == self.KEY_WINDOW and i < len(data):
|
|
result["window_open"] = bool(data[i])
|
|
i += 1
|
|
elif key == self.KEY_HUMIDITY and i < len(data):
|
|
result["humidity"] = data[i]
|
|
i += 1
|
|
elif key == self.KEY_TEMP and i + 1 < len(data):
|
|
# Temperature stored as integer * 10, big-endian
|
|
# Example: 25.3°C is stored as 253 (0x00FD)
|
|
raw = int.from_bytes(data[i:i+2], byteorder='big')
|
|
result["temp"] = raw / 10
|
|
i += 2
|
|
elif key == self.KEY_CO2 and i + 3 < len(data):
|
|
co2 = int.from_bytes(data[i:i+4], byteorder='big')
|
|
# 0xFFFFFFFF indicates sensor not ready or failed
|
|
if co2 != self.INVALID_VALUE:
|
|
result["co2_ppm"] = co2
|
|
else:
|
|
log.debug(f"CO2 sensor not ready — discarding value 0xFFFFFFFF")
|
|
i += 4
|
|
elif key == self.KEY_BATTERY and i < len(data):
|
|
result["battery"] = data[i]
|
|
i += 1
|
|
else:
|
|
# Unknown key — likely a non-Thingy device, ignore silently
|
|
log.debug(f"Unknown key 0x{key:02x} at offset {i-1}")
|
|
break
|
|
return result
|
|
|
|
def is_duplicate(self, mac: str) -> bool:
|
|
"""Check if a packet from this MAC was already published recently.
|
|
|
|
Each Thingy:52 advertises 3-4 times per broadcast frame on BLE
|
|
advertising channels 37, 38 and 39. Without deduplication, the same
|
|
measurement would be published multiple times to the broker.
|
|
This method ignores packets from the same node received within
|
|
DEDUP_WINDOW_SECONDS of the last published packet.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
last = self._last_published.get(mac)
|
|
if last and (now - last).total_seconds() < self.DEDUP_WINDOW_SECONDS:
|
|
log.debug(f"{mac} | duplicate ignored — last published {(now - last).total_seconds():.1f}s ago")
|
|
return True
|
|
self._last_published[mac] = now
|
|
return False
|
|
|
|
def publish(self, mac: str, data: dict):
|
|
"""Publish decoded sensor data to MQTT broker.
|
|
Topic: {gateway_id}/{thingy_mac}/update
|
|
|
|
Checks the return code of mqttc.publish() and exits if an error
|
|
is detected, so systemd can restart the gateway automatically.
|
|
"""
|
|
topic = f"{self.gateway_id}/{mac}/update"
|
|
payload = {
|
|
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
|
}
|
|
if "temp" in data:
|
|
payload["temp"] = data["temp"]
|
|
if "humidity" in data:
|
|
payload["humidity"] = data["humidity"]
|
|
if "co2_ppm" in data:
|
|
payload["co2_ppm"] = data["co2_ppm"]
|
|
if "window_open" in data:
|
|
payload["window_open"] = data["window_open"]
|
|
if "battery" in data:
|
|
payload["battery"] = data["battery"]
|
|
|
|
result = self.mqttc.publish(topic, json.dumps(payload))
|
|
|
|
# Check if publish was successful — rc=0 means success
|
|
# Any other value indicates an error (e.g. connection lost)
|
|
if result.rc != mqtt.MQTT_ERR_SUCCESS:
|
|
log.error(f"Failed to publish to {topic} — rc: {result.rc}")
|
|
log.error("Exiting — systemd will restart the gateway")
|
|
os._exit(1)
|
|
|
|
log.info(f"Published to {topic} : {payload}")
|
|
|
|
def on_device_found(self, device, adv_data):
|
|
"""BLE scan callback — filters on company_id 0xffff in manufacturer data."""
|
|
if 0xffff not in adv_data.manufacturer_data:
|
|
return
|
|
|
|
raw = adv_data.manufacturer_data[0xffff]
|
|
|
|
# Filter on exact payload size to avoid false positives from
|
|
# other BLE devices using the same company_id
|
|
if len(raw) != self.EXPECTED_PAYLOAD_SIZE:
|
|
log.debug(f"{device.address} | ignored — unexpected payload size: {len(raw)}")
|
|
return
|
|
|
|
# Deduplication — ignore packets from the same node within 10 seconds
|
|
if self.is_duplicate(device.address):
|
|
return
|
|
|
|
log.debug(f"{device.address} | Thingy detected, raw: {list(raw)}")
|
|
|
|
data = self.decode_payload(raw)
|
|
if not data:
|
|
log.debug(f"{device.address} | empty decoded payload — ignored")
|
|
return
|
|
|
|
log.debug(f"{device.address} | decoded: {data}")
|
|
self.publish(device.address, data)
|
|
|
|
async def run(self):
|
|
"""Start passive BLE scan and run indefinitely.
|
|
|
|
Scanner is configured with a continuous scan (interval = window = 10ms)
|
|
to ensure no advertising packets are missed. By default, BlueZ uses
|
|
an interval of 1280ms and a window of 11.25ms — less than 1% duty cycle.
|
|
With the Thingy:52 advertising window of only 500ms, packets could be
|
|
missed. Setting interval = window guarantees continuous listening on
|
|
all three BLE advertising channels (37, 38 and 39).
|
|
"""
|
|
log.info("BLE scan started, listening for Thingy:52 advertising packets...")
|
|
scanner = BleakScanner(
|
|
detection_callback=self.on_device_found,
|
|
bluez={"interval": 0x0010, "window": 0x0010}
|
|
)
|
|
await scanner.start()
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
config_path = os.path.join(os.path.dirname(__file__), "config.json")
|
|
with open(config_path, "r") as f:
|
|
config = json.load(f)
|
|
|
|
gateway = Gateway(config)
|
|
asyncio.run(gateway.run())
|