feat(db): add initial main implementation

- connect to MQTTS broker
- connect to Influx DB
- subscribe to +/+/update MQTT topic and send receive message to influx
- add traefik config for ui and db

Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
2026-04-30 17:44:48 +02:00
parent 0086a31f73
commit 3587e10671
3 changed files with 127 additions and 4 deletions

View File

@@ -1,4 +1,5 @@
INFLUX_PORT=8181 INFLUX_PORT=8181
UI_PORT=8093 UI_PORT=8093
MQTT_BROKER_URL=tls://mqtt.e.kb28.ch:8883
MQTT_USERNAME= MQTT_USERNAME=
MQTT_PASSWORD= MQTT_PASSWORD=

110
db/src/main.go Normal file
View File

@@ -0,0 +1,110 @@
package main
import (
"fmt"
"gateway/influx"
"gateway/mqtt"
point "gateway/point"
"log"
"os"
"time"
)
type ProvenceData struct {
CO2PPM int `json:"co2_ppm"`
Temp float64 `json:"temp"`
Humidity int `json:"humidity"`
Window bool `json:"window"`
}
func mqttConnection() *mqtt.MqttGateway {
BrokerUrl, ok := os.LookupEnv("MQTT_BROKER_URL")
if !ok {
BrokerUrl = "tls://localhost:8883"
}
Username, ok := os.LookupEnv("MQTT_USERNAME")
if !ok {
Username = "user"
}
Password, ok := os.LookupEnv("MQTT_PASSWORD")
if !ok {
Password = "password"
}
ClientId := "mqtt-gateway-test-client_" + fmt.Sprint(time.Now().Unix())
// Create config & gateway
mqttP := &mqtt.MqttParams{
Broker: BrokerUrl,
ClientId: ClientId,
Qos: 1,
Username: Username,
Password: Password,
TlsConfig: nil,
OnConnect: nil,
OnConnectionLost: nil,
Timeout: 1 * time.Second,
}
gateway, err := mqtt.NewMqttGateway(*mqttP)
if err != nil {
log.Fatal(err)
}
return gateway
}
func influxConnection() *influx.InfluxGateway {
influxUrl, ok := os.LookupEnv("INFLUX_URL")
if !ok {
influxUrl = "https://db.e.kb28.ch:443"
}
influxDatabase, ok := os.LookupEnv("INFLUX_DATABASE")
if !ok {
influxDatabase = "provence"
}
influxToken, ok := os.LookupEnv("INFLUX_TOKEN")
if !ok {
influxToken = "password"
}
log.Printf("[Main] InfluxDB config: URL=%s, DB=%s, Org=%s, Timeout=%v\n", influxUrl, influxDatabase)
// Create the gateway
gateway, err := influx.NewInfluxGateway(influxUrl, influxToken, influxDatabase)
if err != nil {
log.Fatalf("Creating gateway failed !, %v", err)
}
return gateway
}
func main() {
mqttGateway := mqttConnection()
defer mqttGateway.Disconnect()
influxGateway := influxConnection()
defer influxGateway.Close()
// Create measurement for provence topic
provenceMeasurement := point.CreateMeasurement[ProvenceData]("provence")
tagSubjects := []string{"city", "room"}
err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, tagSubjects, func(dp point.DataPoint[ProvenceData]) {
log.Printf("Received dp: %+v\n", dp)
if err := influxGateway.AddDatapoint(&dp); err != nil {
log.Printf("[Main] Error adding datapoint to influx: %v\n", err)
}
if err := influxGateway.Flush(); err != nil {
log.Printf("[Main] Error flushing to influx: %v\n", err)
}
})
if err != nil {
log.Fatal(err)
}
// Keep the application running to receive messages
select {}
}

View File

@@ -10,15 +10,22 @@ http:
# ClientId: "" # ClientId: ""
# ClientSecret: "" # ClientSecret: ""
routers: routers:
pi-db-ui:
rule: "Host(`ui.db.e.kb28.ch`)"
entryPoints:
- websecure
service: pi-db-ui
tls:
certResolver: letsencrypt
# middlewares:
# - oidc-auth-pi-db@file
pi-db: pi-db:
rule: "Host(`ui.e.kb28.ch`)" rule: "Host(`db.e.kb28.ch`)"
entryPoints: entryPoints:
- websecure - websecure
service: pi-db service: pi-db
tls: tls:
certResolver: letsencrypt certResolver: letsencrypt
# middlewares:
# - oidc-auth-pi-db@file
pi-mqtt-management: pi-mqtt-management:
rule: "Host(`mqtt.e.kb28.ch`)" rule: "Host(`mqtt.e.kb28.ch`)"
entryPoints: entryPoints:
@@ -28,11 +35,16 @@ http:
certResolver: letsencrypt certResolver: letsencrypt
services: services:
pi-db: pi-db-ui:
loadBalancer: loadBalancer:
servers: servers:
- url: "http://192.168.42.211:8093" - url: "http://192.168.42.211:8093"
passHostHeader: true passHostHeader: true
pi-db:
loadBalancer:
servers:
- url: "http://192.168.42.211:8181"
passHostHeader: true
pi-mqtt-management: pi-mqtt-management:
loadBalancer: loadBalancer:
servers: servers: