feat(gateway): implement BLE-to-MQTT gateway
Implement Gateway class that discovers Nordic Thingy:52 nodes via BLE
and publishes sensor data to MQTT broker on each notification received.
- Automatic node discovery via BLE service UUID (ef680100)
- GATT notifications for temperature, humidity and CO2
- Publish immediately on reception to {gateway_id}/{mac}/update
- Connection timeout to avoid blocking on unreachable nodes
- Disconnection detection and automatic reintegration into scan
- Logging with DEBUG/INFO/WARNING/ERROR levels
Assisted-by: Claude:claude-sonnet-4-6 — debugging BLE parallel connections (BlueZ InProgress error), GATT UUID discovery (ef680100 vs ef680200), byte decoding for temperature/humidity/CO2, async connection timeout implementation
This commit is contained in:
@@ -1,242 +1,160 @@
|
||||
import asyncio
|
||||
import json
|
||||
import csv
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from bleak import BleakClient, BleakScanner
|
||||
import paho.mqtt.client as mqtt
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GATT service and characteristic UUIDs for the Nordic Thingy:52
|
||||
# These UUIDs are proprietary to Nordic Semiconductor and are used to
|
||||
# identify the environmental sensor service and its characteristics over BLE.
|
||||
#
|
||||
# THINGY_SERVICE_UUID corresponds to the Configuration service (ef680100),
|
||||
# which is the UUID advertised by the Thingy:52 in its BLE advertising packets.
|
||||
# Note: the Environment service (ef680200) is available after connection
|
||||
# but is not advertised, so it cannot be used for device discovery.
|
||||
# ---------------------------------------------------------------------------
|
||||
THINGY_SERVICE_UUID = "ef680100-9b35-4933-9b10-52ffa9740042"
|
||||
UUID_TEMP = "ef680201-9b35-4933-9b10-52ffa9740042"
|
||||
UUID_CO2 = "ef680204-9b35-4933-9b10-52ffa9740042"
|
||||
UUID_HUMIDITY = "ef680203-9b35-4933-9b10-52ffa9740042"
|
||||
# Logging — use DEBUG for development, INFO for normal operation
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||||
datefmt="%Y-%m-%dT%H:%M:%S"
|
||||
)
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MQTT broker configuration
|
||||
# The broker runs locally on the Raspberry Pi (Mosquitto).
|
||||
# Other teams subscribe to the topics published here.
|
||||
# ---------------------------------------------------------------------------
|
||||
MQTT_BROKER = "localhost"
|
||||
MQTT_PORT = 1883
|
||||
|
||||
# Publishing interval in seconds (300 = every 5 minutes in production)
|
||||
INTERVAL = 300
|
||||
class Gateway:
|
||||
"""BLE to MQTT gateway for Nordic Thingy:52 sensor nodes."""
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Room identification
|
||||
# The operator enters the room ID at startup. This value is used to name
|
||||
# the output files and structure the MQTT topics accordingly.
|
||||
# ---------------------------------------------------------------------------
|
||||
def __init__(self, config: dict):
|
||||
self.gateway_id = config["gateway_id"]
|
||||
self.mqtt_broker = config["mqtt"]["broker"]
|
||||
self.mqtt_port = config["mqtt"]["port"]
|
||||
|
||||
print("=== Gateway IoT - HES-SO ===")
|
||||
ROOM_ID = input("Which room are you in? (e.g. C1, A2, B5) : ").strip().upper()
|
||||
print(f"Room configured : {ROOM_ID}\n")
|
||||
# GATT UUIDs loaded from config
|
||||
# ef680100 is advertised in BLE packets — used for discovery
|
||||
# environment service (ef680200) is only accessible after connection
|
||||
self.service_uuid = config["ble"]["service_uuid"]
|
||||
self.uuid_temp = config["ble"]["characteristics"]["temperature"]
|
||||
self.uuid_co2 = config["ble"]["characteristics"]["co2"]
|
||||
self.uuid_humidity= config["ble"]["characteristics"]["humidity"]
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Output file paths
|
||||
# One CSV and one JSON file are created per room, named after the room ID.
|
||||
# The CSV is intended for local analysis (e.g. Excel).
|
||||
# The JSON follows the format expected by the database team.
|
||||
# ---------------------------------------------------------------------------
|
||||
BASE_PATH = "/home/pi/gateway"
|
||||
CSV_FILE = f"{BASE_PATH}/data_{ROOM_ID}.csv"
|
||||
JSON_FILE = f"{BASE_PATH}/data_{ROOM_ID}.json"
|
||||
# Runtime state
|
||||
# latest : last known sensor values per MAC address
|
||||
# connecting: MACs currently being connected (avoids duplicate attempts)
|
||||
self.latest = {}
|
||||
self.connecting = set()
|
||||
self.scanner = None
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runtime state
|
||||
# discovered : maps each device MAC address to its assigned node ID
|
||||
# latest : stores the most recent sensor values for each device
|
||||
# connecting : tracks MAC addresses currently being connected to
|
||||
# to prevent duplicate connection attempts during BLE scanning
|
||||
# ---------------------------------------------------------------------------
|
||||
discovered = {}
|
||||
latest = {}
|
||||
connecting = set()
|
||||
thingy_counter = [0]
|
||||
scanner = None
|
||||
# MQTT client setup
|
||||
self.mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
self.mqttc.connect(self.mqtt_broker, self.mqtt_port)
|
||||
self.mqttc.loop_start()
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MQTT client setup
|
||||
# The client connects to the local Mosquitto broker and starts its
|
||||
# background loop, which handles message publishing asynchronously.
|
||||
# ---------------------------------------------------------------------------
|
||||
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
||||
mqttc.connect(MQTT_BROKER, MQTT_PORT)
|
||||
mqttc.loop_start()
|
||||
log.info(f"Gateway ID : {self.gateway_id}")
|
||||
log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}")
|
||||
log.info("MQTT client connected")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CSV initialisation
|
||||
# The header row is written only if the file does not already exist,
|
||||
# so that existing data is preserved across restarts.
|
||||
# ---------------------------------------------------------------------------
|
||||
if not os.path.exists(CSV_FILE):
|
||||
with open(CSV_FILE, "w", newline="") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(["timestamp", "room_id", "node_id", "temperature_c", "humidity_pct", "co2_ppm"])
|
||||
# Publish immediately on reception — topic: {gateway_id}/{mac}/update
|
||||
# Only include fields that have been received at least once
|
||||
def publish(self, mac: str, data: dict):
|
||||
topic = f"{self.gateway_id}/{mac}/update"
|
||||
payload = {"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")}
|
||||
if data.get("temp") is not None:
|
||||
payload["temp"] = data.get("temp")
|
||||
if data.get("humidity") is not None:
|
||||
payload["humidity"] = data.get("humidity")
|
||||
if data.get("co2") is not None:
|
||||
payload["co2_ppm"] = data.get("co2")
|
||||
self.mqttc.publish(topic, json.dumps(payload))
|
||||
log.info(f"Published to {topic} : {payload}")
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BLE notification handlers
|
||||
# These functions are called automatically by bleak each time the Thingy:52
|
||||
# sends a new value for the corresponding characteristic.
|
||||
#
|
||||
# Temperature encoding: 2 bytes
|
||||
# byte 0 = integer part
|
||||
# byte 1 = decimal part (hundredths)
|
||||
#
|
||||
# Humidity encoding: 1 byte
|
||||
# byte 0 = relative humidity in percent
|
||||
#
|
||||
# CO2 encoding: 4 bytes (little-endian)
|
||||
# bytes 0-1 = eCO2 in ppm (estimated CO2 derived from VOC measurement)
|
||||
# bytes 2-3 = TVOC in ppb (not used here)
|
||||
# A value of 0 indicates the sensor is still warming up.
|
||||
# ---------------------------------------------------------------------------
|
||||
def handle_temp(mac, sender, data):
|
||||
latest[mac]["temp"] = data[0] + data[1] / 100
|
||||
# BLE handlers — called by bleak on each notification
|
||||
# Temp : 2 bytes (integer part + decimal part)
|
||||
# CO2 : uint16 little-endian, 0 means sensor is warming up
|
||||
# Humidity: 1 byte, direct percentage value
|
||||
def handle_temp(self, mac: str, sender, data: bytearray):
|
||||
temp = data[0] + data[1] / 100
|
||||
if mac in self.latest:
|
||||
self.latest[mac]["temp"] = round(temp, 2)
|
||||
self.publish(mac, self.latest[mac])
|
||||
|
||||
def handle_humidity(mac, sender, data):
|
||||
latest[mac]["humidity"] = data[0]
|
||||
def handle_humidity(self, mac: str, sender, data: bytearray):
|
||||
if mac in self.latest:
|
||||
self.latest[mac]["humidity"] = data[0]
|
||||
self.publish(mac, self.latest[mac])
|
||||
|
||||
def handle_co2(mac, sender, data):
|
||||
co2 = int.from_bytes(data[0:2], byteorder='little')
|
||||
if co2 > 0:
|
||||
latest[mac]["co2"] = co2
|
||||
def handle_co2(self, mac: str, sender, data: bytearray):
|
||||
co2 = int.from_bytes(data[0:2], byteorder='little')
|
||||
if co2 == 0:
|
||||
log.warning(f"{mac} | CO2 sensor still warming up")
|
||||
return
|
||||
if mac in self.latest:
|
||||
self.latest[mac]["co2"] = co2
|
||||
self.publish(mac, self.latest[mac])
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data persistence
|
||||
# Each measurement is appended to both the CSV and JSON files.
|
||||
# The JSON file stores a list of all records, which is reloaded and
|
||||
# extended on each write to preserve the full history.
|
||||
# ---------------------------------------------------------------------------
|
||||
def save_data(payload):
|
||||
with open(CSV_FILE, "a", newline="") as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow([
|
||||
payload["timestamp"],
|
||||
payload["room_id"],
|
||||
payload["node_id"],
|
||||
payload["sensors"]["temperature_c"],
|
||||
payload["sensors"]["humidity_pct"],
|
||||
payload["sensors"]["co2_ppm"]
|
||||
])
|
||||
records = []
|
||||
if os.path.exists(JSON_FILE):
|
||||
with open(JSON_FILE, "r") as f:
|
||||
# Stop scanner before connecting — BlueZ does not support both simultaneously.
|
||||
# Scanner is restarted once the device is connected and notifications registered.
|
||||
async def connect_device(self, mac: str):
|
||||
log.info(f"Attempting to connect to {mac}...")
|
||||
try:
|
||||
if self.scanner:
|
||||
await self.scanner.stop()
|
||||
await asyncio.sleep(1)
|
||||
log.info(f"Connecting to {mac}...")
|
||||
|
||||
def on_disconnect(client):
|
||||
log.warning(f"{mac} disconnected, removing from active nodes")
|
||||
self.latest.pop(mac, None)
|
||||
|
||||
client = BleakClient(mac, disconnected_callback=on_disconnect)
|
||||
try:
|
||||
records = json.load(f)
|
||||
except:
|
||||
records = []
|
||||
records.append(payload)
|
||||
with open(JSON_FILE, "w") as f:
|
||||
json.dump(records, f, indent=2)
|
||||
await asyncio.wait_for(client.connect(), timeout=10.0)
|
||||
except asyncio.TimeoutError:
|
||||
log.error(f"Connection timeout for {mac}")
|
||||
self.connecting.discard(mac)
|
||||
self.latest.pop(mac, None)
|
||||
if self.scanner:
|
||||
await self.scanner.start()
|
||||
return
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BLE device connection
|
||||
# The scanner is stopped before connecting to avoid BLE stack conflicts on
|
||||
# the Raspberry Pi. Once the connection is established and notifications are
|
||||
# registered, the scanner is restarted to detect additional nodes.
|
||||
# ---------------------------------------------------------------------------
|
||||
async def connect_device(mac):
|
||||
global scanner
|
||||
try:
|
||||
if scanner:
|
||||
await scanner.stop()
|
||||
await asyncio.sleep(1)
|
||||
self.connecting.discard(mac)
|
||||
self.latest[mac] = {"temp": None, "humidity": None, "co2": None}
|
||||
log.info(f"{mac} connected")
|
||||
|
||||
client = BleakClient(mac)
|
||||
await client.connect()
|
||||
await client.start_notify(self.uuid_temp,
|
||||
lambda s, d: self.handle_temp(mac, s, d))
|
||||
await client.start_notify(self.uuid_co2,
|
||||
lambda s, d: self.handle_co2(mac, s, d))
|
||||
await client.start_notify(self.uuid_humidity,
|
||||
lambda s, d: self.handle_humidity(mac, s, d))
|
||||
|
||||
node_id = discovered[mac]
|
||||
connecting.discard(mac)
|
||||
print(f"{node_id} ({mac}) connected")
|
||||
await asyncio.sleep(1)
|
||||
await self.scanner.start()
|
||||
log.info("Scanner restarted, waiting for additional nodes")
|
||||
|
||||
await client.start_notify(UUID_TEMP, lambda s, d: handle_temp(mac, s, d))
|
||||
await client.start_notify(UUID_CO2, lambda s, d: handle_co2(mac, s, d))
|
||||
await client.start_notify(UUID_HUMIDITY, lambda s, d: handle_humidity(mac, s, d))
|
||||
except Exception as e:
|
||||
log.error(f"Connection error for {mac}: {e}")
|
||||
self.connecting.discard(mac)
|
||||
self.latest.pop(mac, None)
|
||||
if self.scanner:
|
||||
await self.scanner.start()
|
||||
|
||||
await asyncio.sleep(1)
|
||||
await scanner.start()
|
||||
print("Scanner restarted, waiting for additional nodes...")
|
||||
# BLE discovery callback — filters on Thingy:52 service UUID
|
||||
def on_device_found(self, device, adv_data):
|
||||
uuids = [str(u).lower() for u in adv_data.service_uuids]
|
||||
if self.service_uuid.lower() in uuids \
|
||||
and device.address not in self.latest \
|
||||
and device.address not in self.connecting:
|
||||
self.connecting.add(device.address)
|
||||
log.info(f"New node detected: {device.address}")
|
||||
asyncio.ensure_future(self.connect_device(device.address))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Connection error for {mac} : {e}")
|
||||
connecting.discard(mac)
|
||||
discovered.pop(mac, None)
|
||||
latest.pop(mac, None)
|
||||
if scanner:
|
||||
await scanner.start()
|
||||
# Start BLE scan and run indefinitely
|
||||
async def run(self):
|
||||
log.info("BLE scan started, waiting for Thingy:52 nodes...")
|
||||
self.scanner = BleakScanner(detection_callback=self.on_device_found)
|
||||
await self.scanner.start()
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BLE discovery callback
|
||||
# Called by bleak for every advertising packet received during the scan.
|
||||
# A device is accepted only if it advertises the Thingy:52 environment
|
||||
# service UUID, and only if it has not already been discovered or is not
|
||||
# currently being connected to.
|
||||
# ---------------------------------------------------------------------------
|
||||
def on_device_found(device, adv_data):
|
||||
uuids = [str(u).lower() for u in adv_data.service_uuids]
|
||||
if THINGY_SERVICE_UUID.lower() in uuids \
|
||||
and device.address not in discovered \
|
||||
and device.address not in connecting:
|
||||
connecting.add(device.address)
|
||||
thingy_counter[0] += 1
|
||||
node_id = f"{ROOM_ID}_thingy{thingy_counter[0]}"
|
||||
discovered[device.address] = node_id
|
||||
latest[device.address] = {"temp": None, "humidity": None, "co2": None}
|
||||
print(f"New node detected, assigned name: {node_id} ({device.address})")
|
||||
asyncio.get_event_loop().create_task(connect_device(device.address))
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Periodic MQTT publishing
|
||||
# Every INTERVAL seconds, the latest sensor values from all connected nodes
|
||||
# are formatted as a JSON payload and published to the MQTT broker.
|
||||
# The topic structure follows: classroom/{room_id}/{node_id}
|
||||
# Incomplete readings (sensor still warming up) are skipped for that cycle.
|
||||
# ---------------------------------------------------------------------------
|
||||
async def publish_every_interval():
|
||||
while True:
|
||||
await asyncio.sleep(INTERVAL)
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
print(f"\n--- {datetime.now().strftime('%H:%M:%S')} ---")
|
||||
for mac, values in latest.items():
|
||||
node_id = discovered.get(mac, mac)
|
||||
if any(v is None for v in values.values()):
|
||||
print(f"{node_id} | incomplete data, waiting for sensor warmup...")
|
||||
continue
|
||||
payload = {
|
||||
"timestamp": now,
|
||||
"room_id": ROOM_ID,
|
||||
"node_id": node_id,
|
||||
"sensors": {
|
||||
"co2_ppm": values["co2"],
|
||||
"temperature_c": round(values["temp"], 2),
|
||||
"humidity_pct": values["humidity"]
|
||||
}
|
||||
}
|
||||
topic = f"classroom/{ROOM_ID}/{node_id}"
|
||||
mqttc.publish(topic, json.dumps(payload))
|
||||
save_data(payload)
|
||||
print(f"{node_id} | Temp: {values['temp']:.2f} C | Humidity: {values['humidity']}% | CO2: {values['co2']} ppm | Published")
|
||||
if __name__ == "__main__":
|
||||
config_path = os.path.join(os.path.dirname(__file__), "config.json")
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entry point
|
||||
# Starts the BLE scanner and runs the publishing loop indefinitely.
|
||||
# ---------------------------------------------------------------------------
|
||||
async def main():
|
||||
global scanner
|
||||
print("BLE scan started, waiting for Thingy:52 nodes...\n")
|
||||
scanner = BleakScanner(detection_callback=on_device_found)
|
||||
await scanner.start()
|
||||
await publish_every_interval()
|
||||
|
||||
asyncio.run(main())
|
||||
gateway = Gateway(config)
|
||||
asyncio.run(gateway.run())
|
||||
|
||||
Reference in New Issue
Block a user