refactor(db): add flag to run without MQTT part
Use for local test for developement Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
123
db/src/main.go
123
db/src/main.go
@@ -8,6 +8,7 @@ import (
|
|||||||
"gateway/rest"
|
"gateway/rest"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -92,84 +93,90 @@ func influxConnection() *influx.InfluxGateway {
|
|||||||
// @BasePath /api/v1
|
// @BasePath /api/v1
|
||||||
// @securityDefinitions.basic BasicAuth
|
// @securityDefinitions.basic BasicAuth
|
||||||
func main() {
|
func main() {
|
||||||
|
noMqtt := slices.Contains(os.Args[1:], "--no-mqtt")
|
||||||
|
|
||||||
// Load mapping configuration (reloaded dynamically on each access)
|
// Load mapping configuration (reloaded dynamically on each access)
|
||||||
mappingPath := getEnv("MAPPING_CONFIG_PATH", "mapping.json")
|
mappingPath := getEnv("MAPPING_CONFIG_PATH", "mapping.json")
|
||||||
mapping := NewDynamicMapping(mappingPath)
|
mapping := NewDynamicMapping(mappingPath)
|
||||||
|
|
||||||
mqttGateway := mqttConnection()
|
|
||||||
defer mqttGateway.Disconnect()
|
|
||||||
|
|
||||||
influxGateway := influxConnection()
|
influxGateway := influxConnection()
|
||||||
defer influxGateway.Close()
|
defer influxGateway.Close()
|
||||||
|
|
||||||
measurementName := getEnv("CAMPUS", "provence")
|
measurementName := getEnv("CAMPUS", "provence")
|
||||||
|
|
||||||
// Create measurement for provence topic
|
if !noMqtt {
|
||||||
provenceMeasurement := point.CreateMeasurement[ProvenceData](measurementName)
|
mqttGateway := mqttConnection()
|
||||||
// The incoming MQTT topic structure is: <gateway_id>/<node_id>/update
|
defer mqttGateway.Disconnect()
|
||||||
topicStructur := []string{"gateway", "node"}
|
|
||||||
err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, topicStructur, func(dp point.DataPoint[ProvenceData]) {
|
// Create measurement for provence topic
|
||||||
var gatewayID, nodeID string
|
provenceMeasurement := point.CreateMeasurement[ProvenceData](measurementName)
|
||||||
for _, tag := range dp.Tags() {
|
// The incoming MQTT topic structure is: <gateway_id>/<node_id>/update
|
||||||
switch tag.Subject {
|
topicStructur := []string{"gateway", "node"}
|
||||||
case "gateway":
|
err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, topicStructur, func(dp point.DataPoint[ProvenceData]) {
|
||||||
gatewayID = tag.Content
|
var gatewayID, nodeID string
|
||||||
case "node":
|
for _, tag := range dp.Tags() {
|
||||||
nodeID = tag.Content
|
switch tag.Subject {
|
||||||
|
case "gateway":
|
||||||
|
gatewayID = tag.Content
|
||||||
|
case "node":
|
||||||
|
nodeID = tag.Content
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
campus, campusOk := mapping.GetCampus(gatewayID)
|
campus, campusOk := mapping.GetCampus(gatewayID)
|
||||||
room, roomOk := mapping.GetRoom(nodeID)
|
room, roomOk := mapping.GetRoom(nodeID)
|
||||||
|
|
||||||
batteryLevel := dp.GetValues().Battery
|
batteryLevel := dp.GetValues().Battery
|
||||||
log.Printf("[Main] Received gateway=%s node=%s -> campus=%s room=%s (Battery %d%%)\n", gatewayID, nodeID, campus, room, batteryLevel)
|
log.Printf("[Main] Received gateway=%s node=%s -> campus=%s room=%s (Battery %d%%)\n", gatewayID, nodeID, campus, room, batteryLevel)
|
||||||
|
|
||||||
var influxTags []point.Topic
|
var influxTags []point.Topic
|
||||||
|
|
||||||
if !campusOk {
|
if !campusOk {
|
||||||
log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID)
|
log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID)
|
||||||
influxTags = []point.Topic{
|
influxTags = []point.Topic{
|
||||||
{Subject: "node", Content: nodeID},
|
{Subject: "node", Content: nodeID},
|
||||||
|
}
|
||||||
|
} else if !roomOk {
|
||||||
|
log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID)
|
||||||
|
influxTags = []point.Topic{
|
||||||
|
{Subject: "campus", Content: campus},
|
||||||
|
{Subject: "node", Content: nodeID},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
influxTags = []point.Topic{
|
||||||
|
{Subject: "campus", Content: campus},
|
||||||
|
{Subject: "room", Content: room},
|
||||||
|
{Subject: "node", Content: nodeID},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if !roomOk {
|
|
||||||
log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID)
|
// If CO2PPM value is present and over 1,000,000,000 delete the field, it's calibration value
|
||||||
influxTags = []point.Topic{
|
values := dp.GetValues()
|
||||||
{Subject: "campus", Content: campus},
|
if values.CO2PPM > 1000000000 {
|
||||||
{Subject: "node", Content: nodeID},
|
log.Printf("[Main] Warning: CO2PPM value %d is calibrating, setting to 0\n", values.CO2PPM)
|
||||||
|
values.CO2PPM = 0
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
influxTags = []point.Topic{
|
// Still too high value, something wrong, dropping datapoint
|
||||||
{Subject: "campus", Content: campus},
|
if values.CO2PPM > 10000 {
|
||||||
{Subject: "room", Content: room},
|
log.Printf("[Main] Error: CO2PPM value %d is over threshold, dropping Datapoint\n", values.CO2PPM)
|
||||||
{Subject: "node", Content: nodeID},
|
return
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// If CO2PPM value is present and over 1,000,000,000 delete the field, it's calibration value
|
translatedDp := provenceMeasurement.CreateDataPoint(influxTags, values, dp.Timestamp())
|
||||||
values := dp.GetValues()
|
|
||||||
if values.CO2PPM > 1000000000 {
|
|
||||||
log.Printf("[Main] Warning: CO2PPM value %d is calibrating, setting to 0\n", values.CO2PPM)
|
|
||||||
values.CO2PPM = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// Still too high value, something wrong, dropping datapoint
|
if err := influxGateway.AddDatapoint(&translatedDp); err != nil {
|
||||||
if values.CO2PPM > 10000 {
|
log.Printf("[Main] Error adding datapoint to influx: %v\n", err)
|
||||||
log.Printf("[Main] Error: CO2PPM value %d is over threshold, dropping Datapoint\n", values.CO2PPM)
|
}
|
||||||
return
|
if err := influxGateway.Flush(); err != nil {
|
||||||
|
log.Printf("[Main] Error flushing to influx: %v\n", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
translatedDp := provenceMeasurement.CreateDataPoint(influxTags, values, dp.Timestamp())
|
log.Println("[Main] MQTT disabled (--no-mqtt flag set)")
|
||||||
|
|
||||||
if err := influxGateway.AddDatapoint(&translatedDp); err != nil {
|
|
||||||
log.Printf("[Main] Error adding datapoint to influx: %v\n", err)
|
|
||||||
}
|
|
||||||
if err := influxGateway.Flush(); err != nil {
|
|
||||||
log.Printf("[Main] Error flushing to influx: %v\n", err)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize and start REST Gateway
|
// Initialize and start REST Gateway
|
||||||
|
|||||||
Reference in New Issue
Block a user