diff --git a/db/.env.template b/db/.env.template index c64dd27..4f21e69 100644 --- a/db/.env.template +++ b/db/.env.template @@ -1,4 +1,5 @@ INFLUX_PORT=8181 UI_PORT=8093 +MQTT_BROKER_URL=tls://mqtt.e.kb28.ch:8883 MQTT_USERNAME= MQTT_PASSWORD= diff --git a/db/src/main.go b/db/src/main.go new file mode 100644 index 0000000..01b6353 --- /dev/null +++ b/db/src/main.go @@ -0,0 +1,110 @@ +package main + +import ( + "fmt" + "gateway/influx" + "gateway/mqtt" + point "gateway/point" + "log" + "os" + "time" +) + +type ProvenceData struct { + CO2PPM int `json:"co2_ppm"` + Temp float64 `json:"temp"` + Humidity int `json:"humidity"` + Window bool `json:"window"` +} + +func mqttConnection() *mqtt.MqttGateway { + BrokerUrl, ok := os.LookupEnv("MQTT_BROKER_URL") + if !ok { + BrokerUrl = "tls://localhost:8883" + } + + Username, ok := os.LookupEnv("MQTT_USERNAME") + if !ok { + Username = "user" + } + + Password, ok := os.LookupEnv("MQTT_PASSWORD") + if !ok { + Password = "password" + } + ClientId := "mqtt-gateway-test-client_" + fmt.Sprint(time.Now().Unix()) + + // Create config & gateway + mqttP := &mqtt.MqttParams{ + Broker: BrokerUrl, + ClientId: ClientId, + Qos: 1, + Username: Username, + Password: Password, + TlsConfig: nil, + OnConnect: nil, + OnConnectionLost: nil, + Timeout: 1 * time.Second, + } + gateway, err := mqtt.NewMqttGateway(*mqttP) + if err != nil { + log.Fatal(err) + } + + return gateway +} + +func influxConnection() *influx.InfluxGateway { + influxUrl, ok := os.LookupEnv("INFLUX_URL") + if !ok { + influxUrl = "https://db.e.kb28.ch:443" + } + influxDatabase, ok := os.LookupEnv("INFLUX_DATABASE") + if !ok { + influxDatabase = "provence" + } + + influxToken, ok := os.LookupEnv("INFLUX_TOKEN") + if !ok { + influxToken = "password" + } + + log.Printf("[Main] InfluxDB config: URL=%s, DB=%s, Org=%s, Timeout=%v\n", influxUrl, influxDatabase) + + // Create the gateway + gateway, err := influx.NewInfluxGateway(influxUrl, influxToken, influxDatabase) + if err != nil { + log.Fatalf("Creating gateway failed !, %v", err) + } + + return gateway +} + +func main() { + + mqttGateway := mqttConnection() + defer mqttGateway.Disconnect() + + influxGateway := influxConnection() + defer influxGateway.Close() + + // Create measurement for provence topic + provenceMeasurement := point.CreateMeasurement[ProvenceData]("provence") + + tagSubjects := []string{"city", "room"} + err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, tagSubjects, func(dp point.DataPoint[ProvenceData]) { + log.Printf("Received dp: %+v\n", dp) + if err := influxGateway.AddDatapoint(&dp); err != nil { + log.Printf("[Main] Error adding datapoint to influx: %v\n", err) + } + if err := influxGateway.Flush(); err != nil { + log.Printf("[Main] Error flushing to influx: %v\n", err) + } + }) + if err != nil { + log.Fatal(err) + } + + // Keep the application running to receive messages + select {} +} diff --git a/db/traefik.yml b/db/traefik.yml index aa50004..df0fec8 100644 --- a/db/traefik.yml +++ b/db/traefik.yml @@ -10,15 +10,22 @@ http: # ClientId: "" # ClientSecret: "" routers: + pi-db-ui: + rule: "Host(`ui.db.e.kb28.ch`)" + entryPoints: + - websecure + service: pi-db-ui + tls: + certResolver: letsencrypt + # middlewares: + # - oidc-auth-pi-db@file pi-db: - rule: "Host(`ui.e.kb28.ch`)" + rule: "Host(`db.e.kb28.ch`)" entryPoints: - websecure service: pi-db tls: certResolver: letsencrypt - # middlewares: - # - oidc-auth-pi-db@file pi-mqtt-management: rule: "Host(`mqtt.e.kb28.ch`)" entryPoints: @@ -28,11 +35,16 @@ http: certResolver: letsencrypt services: - pi-db: + pi-db-ui: loadBalancer: servers: - url: "http://192.168.42.211:8093" passHostHeader: true + pi-db: + loadBalancer: + servers: + - url: "http://192.168.42.211:8181" + passHostHeader: true pi-mqtt-management: loadBalancer: servers: