diff --git a/gateway/gateway.py b/gateway/gateway.py index 722026e..7ecfa59 100644 --- a/gateway/gateway.py +++ b/gateway/gateway.py @@ -1,242 +1,160 @@ import asyncio import json -import csv +import logging import os from datetime import datetime, timezone from bleak import BleakClient, BleakScanner import paho.mqtt.client as mqtt -# --------------------------------------------------------------------------- -# GATT service and characteristic UUIDs for the Nordic Thingy:52 -# These UUIDs are proprietary to Nordic Semiconductor and are used to -# identify the environmental sensor service and its characteristics over BLE. -# -# THINGY_SERVICE_UUID corresponds to the Configuration service (ef680100), -# which is the UUID advertised by the Thingy:52 in its BLE advertising packets. -# Note: the Environment service (ef680200) is available after connection -# but is not advertised, so it cannot be used for device discovery. -# --------------------------------------------------------------------------- -THINGY_SERVICE_UUID = "ef680100-9b35-4933-9b10-52ffa9740042" -UUID_TEMP = "ef680201-9b35-4933-9b10-52ffa9740042" -UUID_CO2 = "ef680204-9b35-4933-9b10-52ffa9740042" -UUID_HUMIDITY = "ef680203-9b35-4933-9b10-52ffa9740042" +# Logging — use DEBUG for development, INFO for normal operation +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S" +) +log = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# MQTT broker configuration -# The broker runs locally on the Raspberry Pi (Mosquitto). -# Other teams subscribe to the topics published here. -# --------------------------------------------------------------------------- -MQTT_BROKER = "localhost" -MQTT_PORT = 1883 -# Publishing interval in seconds (300 = every 5 minutes in production) -INTERVAL = 300 +class Gateway: + """BLE to MQTT gateway for Nordic Thingy:52 sensor nodes.""" -# --------------------------------------------------------------------------- -# Room identification -# The operator enters the room ID at startup. This value is used to name -# the output files and structure the MQTT topics accordingly. -# --------------------------------------------------------------------------- + def __init__(self, config: dict): + self.gateway_id = config["gateway_id"] + self.mqtt_broker = config["mqtt"]["broker"] + self.mqtt_port = config["mqtt"]["port"] -print("=== Gateway IoT - HES-SO ===") -ROOM_ID = input("Which room are you in? (e.g. C1, A2, B5) : ").strip().upper() -print(f"Room configured : {ROOM_ID}\n") + # GATT UUIDs loaded from config + # ef680100 is advertised in BLE packets — used for discovery + # environment service (ef680200) is only accessible after connection + self.service_uuid = config["ble"]["service_uuid"] + self.uuid_temp = config["ble"]["characteristics"]["temperature"] + self.uuid_co2 = config["ble"]["characteristics"]["co2"] + self.uuid_humidity= config["ble"]["characteristics"]["humidity"] -# --------------------------------------------------------------------------- -# Output file paths -# One CSV and one JSON file are created per room, named after the room ID. -# The CSV is intended for local analysis (e.g. Excel). -# The JSON follows the format expected by the database team. -# --------------------------------------------------------------------------- -BASE_PATH = "/home/pi/gateway" -CSV_FILE = f"{BASE_PATH}/data_{ROOM_ID}.csv" -JSON_FILE = f"{BASE_PATH}/data_{ROOM_ID}.json" + # Runtime state + # latest : last known sensor values per MAC address + # connecting: MACs currently being connected (avoids duplicate attempts) + self.latest = {} + self.connecting = set() + self.scanner = None -# --------------------------------------------------------------------------- -# Runtime state -# discovered : maps each device MAC address to its assigned node ID -# latest : stores the most recent sensor values for each device -# connecting : tracks MAC addresses currently being connected to -# to prevent duplicate connection attempts during BLE scanning -# --------------------------------------------------------------------------- -discovered = {} -latest = {} -connecting = set() -thingy_counter = [0] -scanner = None + # MQTT client setup + self.mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + self.mqttc.connect(self.mqtt_broker, self.mqtt_port) + self.mqttc.loop_start() -# --------------------------------------------------------------------------- -# MQTT client setup -# The client connects to the local Mosquitto broker and starts its -# background loop, which handles message publishing asynchronously. -# --------------------------------------------------------------------------- -mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) -mqttc.connect(MQTT_BROKER, MQTT_PORT) -mqttc.loop_start() + log.info(f"Gateway ID : {self.gateway_id}") + log.info(f"MQTT broker: {self.mqtt_broker}:{self.mqtt_port}") + log.info("MQTT client connected") -# --------------------------------------------------------------------------- -# CSV initialisation -# The header row is written only if the file does not already exist, -# so that existing data is preserved across restarts. -# --------------------------------------------------------------------------- -if not os.path.exists(CSV_FILE): - with open(CSV_FILE, "w", newline="") as f: - writer = csv.writer(f) - writer.writerow(["timestamp", "room_id", "node_id", "temperature_c", "humidity_pct", "co2_ppm"]) + # Publish immediately on reception — topic: {gateway_id}/{mac}/update + # Only include fields that have been received at least once + def publish(self, mac: str, data: dict): + topic = f"{self.gateway_id}/{mac}/update" + payload = {"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")} + if data.get("temp") is not None: + payload["temp"] = data.get("temp") + if data.get("humidity") is not None: + payload["humidity"] = data.get("humidity") + if data.get("co2") is not None: + payload["co2_ppm"] = data.get("co2") + self.mqttc.publish(topic, json.dumps(payload)) + log.info(f"Published to {topic} : {payload}") -# --------------------------------------------------------------------------- -# BLE notification handlers -# These functions are called automatically by bleak each time the Thingy:52 -# sends a new value for the corresponding characteristic. -# -# Temperature encoding: 2 bytes -# byte 0 = integer part -# byte 1 = decimal part (hundredths) -# -# Humidity encoding: 1 byte -# byte 0 = relative humidity in percent -# -# CO2 encoding: 4 bytes (little-endian) -# bytes 0-1 = eCO2 in ppm (estimated CO2 derived from VOC measurement) -# bytes 2-3 = TVOC in ppb (not used here) -# A value of 0 indicates the sensor is still warming up. -# --------------------------------------------------------------------------- -def handle_temp(mac, sender, data): - latest[mac]["temp"] = data[0] + data[1] / 100 + # BLE handlers — called by bleak on each notification + # Temp : 2 bytes (integer part + decimal part) + # CO2 : uint16 little-endian, 0 means sensor is warming up + # Humidity: 1 byte, direct percentage value + def handle_temp(self, mac: str, sender, data: bytearray): + temp = data[0] + data[1] / 100 + if mac in self.latest: + self.latest[mac]["temp"] = round(temp, 2) + self.publish(mac, self.latest[mac]) -def handle_humidity(mac, sender, data): - latest[mac]["humidity"] = data[0] + def handle_humidity(self, mac: str, sender, data: bytearray): + if mac in self.latest: + self.latest[mac]["humidity"] = data[0] + self.publish(mac, self.latest[mac]) -def handle_co2(mac, sender, data): - co2 = int.from_bytes(data[0:2], byteorder='little') - if co2 > 0: - latest[mac]["co2"] = co2 + def handle_co2(self, mac: str, sender, data: bytearray): + co2 = int.from_bytes(data[0:2], byteorder='little') + if co2 == 0: + log.warning(f"{mac} | CO2 sensor still warming up") + return + if mac in self.latest: + self.latest[mac]["co2"] = co2 + self.publish(mac, self.latest[mac]) -# --------------------------------------------------------------------------- -# Data persistence -# Each measurement is appended to both the CSV and JSON files. -# The JSON file stores a list of all records, which is reloaded and -# extended on each write to preserve the full history. -# --------------------------------------------------------------------------- -def save_data(payload): - with open(CSV_FILE, "a", newline="") as f: - writer = csv.writer(f) - writer.writerow([ - payload["timestamp"], - payload["room_id"], - payload["node_id"], - payload["sensors"]["temperature_c"], - payload["sensors"]["humidity_pct"], - payload["sensors"]["co2_ppm"] - ]) - records = [] - if os.path.exists(JSON_FILE): - with open(JSON_FILE, "r") as f: + # Stop scanner before connecting — BlueZ does not support both simultaneously. + # Scanner is restarted once the device is connected and notifications registered. + async def connect_device(self, mac: str): + log.info(f"Attempting to connect to {mac}...") + try: + if self.scanner: + await self.scanner.stop() + await asyncio.sleep(1) + log.info(f"Connecting to {mac}...") + + def on_disconnect(client): + log.warning(f"{mac} disconnected, removing from active nodes") + self.latest.pop(mac, None) + + client = BleakClient(mac, disconnected_callback=on_disconnect) try: - records = json.load(f) - except: - records = [] - records.append(payload) - with open(JSON_FILE, "w") as f: - json.dump(records, f, indent=2) + await asyncio.wait_for(client.connect(), timeout=10.0) + except asyncio.TimeoutError: + log.error(f"Connection timeout for {mac}") + self.connecting.discard(mac) + self.latest.pop(mac, None) + if self.scanner: + await self.scanner.start() + return -# --------------------------------------------------------------------------- -# BLE device connection -# The scanner is stopped before connecting to avoid BLE stack conflicts on -# the Raspberry Pi. Once the connection is established and notifications are -# registered, the scanner is restarted to detect additional nodes. -# --------------------------------------------------------------------------- -async def connect_device(mac): - global scanner - try: - if scanner: - await scanner.stop() - await asyncio.sleep(1) + self.connecting.discard(mac) + self.latest[mac] = {"temp": None, "humidity": None, "co2": None} + log.info(f"{mac} connected") - client = BleakClient(mac) - await client.connect() + await client.start_notify(self.uuid_temp, + lambda s, d: self.handle_temp(mac, s, d)) + await client.start_notify(self.uuid_co2, + lambda s, d: self.handle_co2(mac, s, d)) + await client.start_notify(self.uuid_humidity, + lambda s, d: self.handle_humidity(mac, s, d)) - node_id = discovered[mac] - connecting.discard(mac) - print(f"{node_id} ({mac}) connected") + await asyncio.sleep(1) + await self.scanner.start() + log.info("Scanner restarted, waiting for additional nodes") - await client.start_notify(UUID_TEMP, lambda s, d: handle_temp(mac, s, d)) - await client.start_notify(UUID_CO2, lambda s, d: handle_co2(mac, s, d)) - await client.start_notify(UUID_HUMIDITY, lambda s, d: handle_humidity(mac, s, d)) + except Exception as e: + log.error(f"Connection error for {mac}: {e}") + self.connecting.discard(mac) + self.latest.pop(mac, None) + if self.scanner: + await self.scanner.start() - await asyncio.sleep(1) - await scanner.start() - print("Scanner restarted, waiting for additional nodes...") + # BLE discovery callback — filters on Thingy:52 service UUID + def on_device_found(self, device, adv_data): + uuids = [str(u).lower() for u in adv_data.service_uuids] + if self.service_uuid.lower() in uuids \ + and device.address not in self.latest \ + and device.address not in self.connecting: + self.connecting.add(device.address) + log.info(f"New node detected: {device.address}") + asyncio.ensure_future(self.connect_device(device.address)) - except Exception as e: - print(f"Connection error for {mac} : {e}") - connecting.discard(mac) - discovered.pop(mac, None) - latest.pop(mac, None) - if scanner: - await scanner.start() + # Start BLE scan and run indefinitely + async def run(self): + log.info("BLE scan started, waiting for Thingy:52 nodes...") + self.scanner = BleakScanner(detection_callback=self.on_device_found) + await self.scanner.start() + while True: + await asyncio.sleep(1) -# --------------------------------------------------------------------------- -# BLE discovery callback -# Called by bleak for every advertising packet received during the scan. -# A device is accepted only if it advertises the Thingy:52 environment -# service UUID, and only if it has not already been discovered or is not -# currently being connected to. -# --------------------------------------------------------------------------- -def on_device_found(device, adv_data): - uuids = [str(u).lower() for u in adv_data.service_uuids] - if THINGY_SERVICE_UUID.lower() in uuids \ - and device.address not in discovered \ - and device.address not in connecting: - connecting.add(device.address) - thingy_counter[0] += 1 - node_id = f"{ROOM_ID}_thingy{thingy_counter[0]}" - discovered[device.address] = node_id - latest[device.address] = {"temp": None, "humidity": None, "co2": None} - print(f"New node detected, assigned name: {node_id} ({device.address})") - asyncio.get_event_loop().create_task(connect_device(device.address)) -# --------------------------------------------------------------------------- -# Periodic MQTT publishing -# Every INTERVAL seconds, the latest sensor values from all connected nodes -# are formatted as a JSON payload and published to the MQTT broker. -# The topic structure follows: classroom/{room_id}/{node_id} -# Incomplete readings (sensor still warming up) are skipped for that cycle. -# --------------------------------------------------------------------------- -async def publish_every_interval(): - while True: - await asyncio.sleep(INTERVAL) - now = datetime.now(timezone.utc).isoformat() - print(f"\n--- {datetime.now().strftime('%H:%M:%S')} ---") - for mac, values in latest.items(): - node_id = discovered.get(mac, mac) - if any(v is None for v in values.values()): - print(f"{node_id} | incomplete data, waiting for sensor warmup...") - continue - payload = { - "timestamp": now, - "room_id": ROOM_ID, - "node_id": node_id, - "sensors": { - "co2_ppm": values["co2"], - "temperature_c": round(values["temp"], 2), - "humidity_pct": values["humidity"] - } - } - topic = f"classroom/{ROOM_ID}/{node_id}" - mqttc.publish(topic, json.dumps(payload)) - save_data(payload) - print(f"{node_id} | Temp: {values['temp']:.2f} C | Humidity: {values['humidity']}% | CO2: {values['co2']} ppm | Published") +if __name__ == "__main__": + config_path = os.path.join(os.path.dirname(__file__), "config.json") + with open(config_path, "r") as f: + config = json.load(f) -# --------------------------------------------------------------------------- -# Entry point -# Starts the BLE scanner and runs the publishing loop indefinitely. -# --------------------------------------------------------------------------- -async def main(): - global scanner - print("BLE scan started, waiting for Thingy:52 nodes...\n") - scanner = BleakScanner(detection_callback=on_device_found) - await scanner.start() - await publish_every_interval() - -asyncio.run(main()) + gateway = Gateway(config) + asyncio.run(gateway.run())