Files
MSE-PI-E2EEDA-Plein-de-eeee…/db/src/main.go
Klagarge eecb4a196b refactor(db): add flag to run without MQTT part
Use for local test for developement

Signed-off-by: Klagarge <remi@heredero.ch>
2026-06-04 14:46:35 +02:00

196 lines
5.4 KiB
Go

package main
import (
"fmt"
"gateway/influx"
"gateway/mqtt"
point "gateway/point"
"gateway/rest"
"log"
"os"
"slices"
"strings"
"time"
)
func getEnv(key, fallback string) string {
if value := os.Getenv(key); value != "" {
return value
}
return fallback
}
type ProvenceData struct {
CO2PPM int `json:"co2_ppm"`
Temp float64 `json:"temp"`
Humidity int `json:"humidity"`
Battery int `json:"battery"`
Window bool `json:"window_open"`
}
func mqttConnection() *mqtt.MqttGateway {
BrokerUrl := getEnv("MQTT_BROKER_URL", "tls://localhost:8883")
Username := getEnv("MQTT_USERNAME", "user")
Password := getEnv("MQTT_PASSWORD", "password")
ClientId := "mqtt-gateway-test-client_" + fmt.Sprint(time.Now().Unix())
// Create config & gateway
mqttP := &mqtt.MqttParams{
Broker: BrokerUrl,
ClientId: ClientId,
Qos: 1,
Username: Username,
Password: Password,
TlsConfig: nil,
OnConnect: nil,
OnConnectionLost: nil,
Timeout: 1 * time.Second,
}
gateway, err := mqtt.NewMqttGateway(*mqttP)
if err != nil {
log.Fatal(err)
}
return gateway
}
func influxConnection() *influx.InfluxGateway {
influxUrl := getEnv("INFLUX_URL", "http://influxdb:8181")
influxDatabase := getEnv("INFLUX_DATABASE", "provence")
influxToken := getEnv("INFLUX_TOKEN", "")
if influxToken == "" {
if tokenFile := getEnv("INFLUX_TOKEN_FILE", "/run/secrets/admin-token"); tokenFile != "" {
content, err := os.ReadFile(tokenFile)
if err == nil {
influxToken = strings.TrimSpace(string(content))
} else {
log.Printf("[Main] Warning: could not read token file %s: %v\n", tokenFile, err)
}
}
}
if influxToken == "" {
influxToken = "password"
}
log.Printf("[Main] InfluxDB config: URL=%s, DB=%s\n", influxUrl, influxDatabase)
// Create the gateway
gateway, err := influx.NewInfluxGateway(influxUrl, influxToken, influxDatabase)
if err != nil {
log.Fatalf("Creating gateway failed !, %v", err)
}
return gateway
}
// @title Gateway API
// @version 1.0
// @description This is a gateway API for IoT data.
// @host api.db.e.kb28.ch
// @BasePath /api/v1
// @securityDefinitions.basic BasicAuth
func main() {
noMqtt := slices.Contains(os.Args[1:], "--no-mqtt")
// Load mapping configuration (reloaded dynamically on each access)
mappingPath := getEnv("MAPPING_CONFIG_PATH", "mapping.json")
mapping := NewDynamicMapping(mappingPath)
influxGateway := influxConnection()
defer influxGateway.Close()
measurementName := getEnv("CAMPUS", "provence")
if !noMqtt {
mqttGateway := mqttConnection()
defer mqttGateway.Disconnect()
// Create measurement for provence topic
provenceMeasurement := point.CreateMeasurement[ProvenceData](measurementName)
// The incoming MQTT topic structure is: <gateway_id>/<node_id>/update
topicStructur := []string{"gateway", "node"}
err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, topicStructur, func(dp point.DataPoint[ProvenceData]) {
var gatewayID, nodeID string
for _, tag := range dp.Tags() {
switch tag.Subject {
case "gateway":
gatewayID = tag.Content
case "node":
nodeID = tag.Content
}
}
campus, campusOk := mapping.GetCampus(gatewayID)
room, roomOk := mapping.GetRoom(nodeID)
batteryLevel := dp.GetValues().Battery
log.Printf("[Main] Received gateway=%s node=%s -> campus=%s room=%s (Battery %d%%)\n", gatewayID, nodeID, campus, room, batteryLevel)
var influxTags []point.Topic
if !campusOk {
log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID)
influxTags = []point.Topic{
{Subject: "node", Content: nodeID},
}
} else if !roomOk {
log.Printf("[Main] No mapping found for gateway=%s\n", gatewayID)
influxTags = []point.Topic{
{Subject: "campus", Content: campus},
{Subject: "node", Content: nodeID},
}
} else {
influxTags = []point.Topic{
{Subject: "campus", Content: campus},
{Subject: "room", Content: room},
{Subject: "node", Content: nodeID},
}
}
// If CO2PPM value is present and over 1,000,000,000 delete the field, it's calibration value
values := dp.GetValues()
if values.CO2PPM > 1000000000 {
log.Printf("[Main] Warning: CO2PPM value %d is calibrating, setting to 0\n", values.CO2PPM)
values.CO2PPM = 0
}
// Still too high value, something wrong, dropping datapoint
if values.CO2PPM > 10000 {
log.Printf("[Main] Error: CO2PPM value %d is over threshold, dropping Datapoint\n", values.CO2PPM)
return
}
translatedDp := provenceMeasurement.CreateDataPoint(influxTags, values, dp.Timestamp())
if err := influxGateway.AddDatapoint(&translatedDp); err != nil {
log.Printf("[Main] Error adding datapoint to influx: %v\n", err)
}
if err := influxGateway.Flush(); err != nil {
log.Printf("[Main] Error flushing to influx: %v\n", err)
}
})
if err != nil {
log.Fatal(err)
}
} else {
log.Println("[Main] MQTT disabled (--no-mqtt flag set)")
}
// Initialize and start REST Gateway
restUsername := getEnv("REST_USERNAME", "user")
restPassword := getEnv("REST_PASSWORD", "password")
restGateway := rest.NewRestGateway(influxGateway, mapping, measurementName, restUsername, restPassword)
port := getEnv("REST_PORT", "8080")
log.Printf("[Main] Starting REST Gateway on port %s\n", port)
if err := restGateway.Run(":" + port); err != nil {
log.Fatalf("[Main] Failed to start REST Gateway: %v", err)
}
select {}
}