feat(db): add co2 watchdog on each room

Assisted-by: Junie:gemini-3-flash
Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
2026-05-30 20:38:44 +02:00
parent 7c000f3b9c
commit 2b766d3d96
2 changed files with 96 additions and 0 deletions

View File

@@ -96,6 +96,8 @@ services:
- REST_USERNAME=${REST_USERNAME}
- REST_PASSWORD=${REST_PASSWORD}
- MAPPING_CONFIG_PATH=/config/mapping.json
- CO2_THREASHOLD_MAX=1400
- CO2_THREASHOLD_MIN=1000
secrets:
- admin-token
volumes:

View File

@@ -6,11 +6,14 @@ import (
"fmt"
_ "gateway/docs"
"gateway/influx"
"log"
"net/http"
"os"
"regexp"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/apache/arrow-go/v18/arrow"
@@ -35,9 +38,28 @@ type RestGateway struct {
measurementName string
username string
password string
co2ThresholdMax int
co2ThresholdMin int
roomStatus map[string]*RoomCO2Status
statusMu sync.RWMutex
}
type RoomCO2Status struct {
RoomID string `json:"room"`
IsHigh bool `json:"is_high"`
CO2 int `json:"co2"`
}
func NewRestGateway(influxGateway *influx.InfluxGateway, mapping RoomMapper, measurementName string, username, password string) *RestGateway {
maxThreshold, _ := strconv.Atoi(os.Getenv("CO2_THREASHOLD_MAX"))
if maxThreshold == 0 {
maxThreshold = 1400
}
minThreshold, _ := strconv.Atoi(os.Getenv("CO2_THREASHOLD_MIN"))
if minThreshold == 0 {
minThreshold = 1000
}
g := &RestGateway{
influxGateway: influxGateway,
mapping: mapping,
@@ -45,9 +67,13 @@ func NewRestGateway(influxGateway *influx.InfluxGateway, mapping RoomMapper, mea
measurementName: measurementName,
username: username,
password: password,
co2ThresholdMax: maxThreshold,
co2ThresholdMin: minThreshold,
roomStatus: make(map[string]*RoomCO2Status),
}
g.setupRoutes()
go g.runWatchdog()
return g
}
@@ -463,3 +489,71 @@ func (g *RestGateway) getRoomHistory(c *gin.Context) {
c.JSON(http.StatusOK, history)
}
func (g *RestGateway) runWatchdog() {
// Initial check
g.checkCO2()
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
g.checkCO2()
}
}
func (g *RestGateway) checkCO2() {
rooms := g.mapping.Rooms()
for _, roomID := range rooms {
nodes := g.mapping.NodesForRoom(roomID)
if len(nodes) == 0 {
continue
}
nodeFilter := buildNodeFilter(nodes)
// Get the last record for the specific room by matching node IDs, aggregated by 5m intervals
// Same logic as getRoomCurrent
query := fmt.Sprintf(`
SELECT
date_bin(INTERVAL '5 minutes', time)::TIMESTAMP AS time,
ROUND(AVG(co2_ppm)) AS co2_ppm
FROM "%s"
WHERE time > now() - INTERVAL '1 day'
AND %s
GROUP BY date_bin(INTERVAL '5 minutes', time)
ORDER BY time ASC
LIMIT 1
`, g.measurementName, nodeFilter,
)
it, err := g.influxGateway.Query(context.Background(), query)
if err != nil {
log.Printf("[Watchdog] Error querying CO2 for room %s: %v\n", roomID, err)
continue
}
if it.Next() {
val := it.Value()
if co2Val, ok := val["co2_ppm"]; ok {
co2 := int(co2Val.(float64))
g.statusMu.Lock()
status, ok := g.roomStatus[roomID]
if !ok {
status = &RoomCO2Status{RoomID: roomID}
g.roomStatus[roomID] = status
}
status.CO2 = co2
if co2 > g.co2ThresholdMax {
status.IsHigh = true
} else if co2 < g.co2ThresholdMin {
status.IsHigh = false
}
g.statusMu.Unlock()
}
}
if err := it.Err(); err != nil {
log.Printf("[Watchdog] Error in iterator for room %s: %v\n", roomID, err)
}
}
}