From eecb4a196bc3250123ffe708dc5af511a1d81466 Mon Sep 17 00:00:00 2001 From: Klagarge Date: Fri, 29 May 2026 23:13:41 +0200 Subject: [PATCH] refactor(db): add flag to run without MQTT part Use for local test for developement Signed-off-by: Klagarge --- db/src/main.go | 123 ++++++++++++++++++++++++++----------------------- 1 file changed, 65 insertions(+), 58 deletions(-) diff --git a/db/src/main.go b/db/src/main.go index 867bbb8..108fd3e 100644 --- a/db/src/main.go +++ b/db/src/main.go @@ -8,6 +8,7 @@ import ( "gateway/rest" "log" "os" + "slices" "strings" "time" ) @@ -92,84 +93,90 @@ func influxConnection() *influx.InfluxGateway { // @BasePath /api/v1 // @securityDefinitions.basic BasicAuth func main() { + noMqtt := slices.Contains(os.Args[1:], "--no-mqtt") + // Load mapping configuration (reloaded dynamically on each access) mappingPath := getEnv("MAPPING_CONFIG_PATH", "mapping.json") mapping := NewDynamicMapping(mappingPath) - mqttGateway := mqttConnection() - defer mqttGateway.Disconnect() - influxGateway := influxConnection() defer influxGateway.Close() measurementName := getEnv("CAMPUS", "provence") - // Create measurement for provence topic - provenceMeasurement := point.CreateMeasurement[ProvenceData](measurementName) - // The incoming MQTT topic structure is: //update - topicStructur := []string{"gateway", "node"} - err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, topicStructur, func(dp point.DataPoint[ProvenceData]) { - var gatewayID, nodeID string - for _, tag := range dp.Tags() { - switch tag.Subject { - case "gateway": - gatewayID = tag.Content - case "node": - nodeID = tag.Content + if !noMqtt { + mqttGateway := mqttConnection() + defer mqttGateway.Disconnect() + + // Create measurement for provence topic + provenceMeasurement := point.CreateMeasurement[ProvenceData](measurementName) + // The incoming MQTT topic structure is: //update + topicStructur := []string{"gateway", "node"} + err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, topicStructur, func(dp point.DataPoint[ProvenceData]) { + var gatewayID, nodeID string + for _, tag := range dp.Tags() { + switch tag.Subject { + case "gateway": + gatewayID = tag.Content + case "node": + nodeID = tag.Content + } } - } - campus, campusOk := mapping.GetCampus(gatewayID) - room, roomOk := mapping.GetRoom(nodeID) + campus, campusOk := mapping.GetCampus(gatewayID) + room, roomOk := mapping.GetRoom(nodeID) - batteryLevel := dp.GetValues().Battery - log.Printf("[Main] Received gateway=%s node=%s -> campus=%s room=%s (Battery %d%%)\n", gatewayID, nodeID, campus, room, batteryLevel) + batteryLevel := dp.GetValues().Battery + log.Printf("[Main] Received gateway=%s node=%s -> campus=%s room=%s (Battery %d%%)\n", gatewayID, nodeID, campus, room, batteryLevel) - var influxTags []point.Topic + var influxTags []point.Topic - if !campusOk { - log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID) - influxTags = []point.Topic{ - {Subject: "node", Content: nodeID}, + if !campusOk { + log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID) + influxTags = []point.Topic{ + {Subject: "node", Content: nodeID}, + } + } else if !roomOk { + log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID) + influxTags = []point.Topic{ + {Subject: "campus", Content: campus}, + {Subject: "node", Content: nodeID}, + } + } else { + influxTags = []point.Topic{ + {Subject: "campus", Content: campus}, + {Subject: "room", Content: room}, + {Subject: "node", Content: nodeID}, + } } - } else if !roomOk { - log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID) - influxTags = []point.Topic{ - {Subject: "campus", Content: campus}, - {Subject: "node", Content: nodeID}, + + // If CO2PPM value is present and over 1,000,000,000 delete the field, it's calibration value + values := dp.GetValues() + if values.CO2PPM > 1000000000 { + log.Printf("[Main] Warning: CO2PPM value %d is calibrating, setting to 0\n", values.CO2PPM) + values.CO2PPM = 0 } - } else { - influxTags = []point.Topic{ - {Subject: "campus", Content: campus}, - {Subject: "room", Content: room}, - {Subject: "node", Content: nodeID}, + + // Still too high value, something wrong, dropping datapoint + if values.CO2PPM > 10000 { + log.Printf("[Main] Error: CO2PPM value %d is over threshold, dropping Datapoint\n", values.CO2PPM) + return } - } - // If CO2PPM value is present and over 1,000,000,000 delete the field, it's calibration value - values := dp.GetValues() - if values.CO2PPM > 1000000000 { - log.Printf("[Main] Warning: CO2PPM value %d is calibrating, setting to 0\n", values.CO2PPM) - values.CO2PPM = 0 - } + translatedDp := provenceMeasurement.CreateDataPoint(influxTags, values, dp.Timestamp()) - // Still too high value, something wrong, dropping datapoint - if values.CO2PPM > 10000 { - log.Printf("[Main] Error: CO2PPM value %d is over threshold, dropping Datapoint\n", values.CO2PPM) - return + if err := influxGateway.AddDatapoint(&translatedDp); err != nil { + log.Printf("[Main] Error adding datapoint to influx: %v\n", err) + } + if err := influxGateway.Flush(); err != nil { + log.Printf("[Main] Error flushing to influx: %v\n", err) + } + }) + if err != nil { + log.Fatal(err) } - - translatedDp := provenceMeasurement.CreateDataPoint(influxTags, values, dp.Timestamp()) - - if err := influxGateway.AddDatapoint(&translatedDp); err != nil { - log.Printf("[Main] Error adding datapoint to influx: %v\n", err) - } - if err := influxGateway.Flush(); err != nil { - log.Printf("[Main] Error flushing to influx: %v\n", err) - } - }) - if err != nil { - log.Fatal(err) + } else { + log.Println("[Main] MQTT disabled (--no-mqtt flag set)") } // Initialize and start REST Gateway