diff --git a/db/docker-compose.yml b/db/docker-compose.yml index d78d35a..965314a 100644 --- a/db/docker-compose.yml +++ b/db/docker-compose.yml @@ -96,6 +96,8 @@ services: - REST_USERNAME=${REST_USERNAME} - REST_PASSWORD=${REST_PASSWORD} - MAPPING_CONFIG_PATH=/config/mapping.json + - CO2_THREASHOLD_MAX=1400 + - CO2_THREASHOLD_MIN=1000 secrets: - admin-token volumes: diff --git a/db/src/rest/rest.go b/db/src/rest/rest.go index b0fa8d8..90b440c 100644 --- a/db/src/rest/rest.go +++ b/db/src/rest/rest.go @@ -6,11 +6,14 @@ import ( "fmt" _ "gateway/docs" "gateway/influx" + "log" "net/http" + "os" "regexp" "slices" "strconv" "strings" + "sync" "time" "github.com/apache/arrow-go/v18/arrow" @@ -35,9 +38,28 @@ type RestGateway struct { measurementName string username string password string + co2ThresholdMax int + co2ThresholdMin int + roomStatus map[string]*RoomCO2Status + statusMu sync.RWMutex +} + +type RoomCO2Status struct { + RoomID string `json:"room"` + IsHigh bool `json:"is_high"` + CO2 int `json:"co2"` } func NewRestGateway(influxGateway *influx.InfluxGateway, mapping RoomMapper, measurementName string, username, password string) *RestGateway { + maxThreshold, _ := strconv.Atoi(os.Getenv("CO2_THREASHOLD_MAX")) + if maxThreshold == 0 { + maxThreshold = 1400 + } + minThreshold, _ := strconv.Atoi(os.Getenv("CO2_THREASHOLD_MIN")) + if minThreshold == 0 { + minThreshold = 1000 + } + g := &RestGateway{ influxGateway: influxGateway, mapping: mapping, @@ -45,9 +67,13 @@ func NewRestGateway(influxGateway *influx.InfluxGateway, mapping RoomMapper, mea measurementName: measurementName, username: username, password: password, + co2ThresholdMax: maxThreshold, + co2ThresholdMin: minThreshold, + roomStatus: make(map[string]*RoomCO2Status), } g.setupRoutes() + go g.runWatchdog() return g } @@ -463,3 +489,71 @@ func (g *RestGateway) getRoomHistory(c *gin.Context) { c.JSON(http.StatusOK, history) } + +func (g *RestGateway) runWatchdog() { + // Initial check + g.checkCO2() + + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for range ticker.C { + g.checkCO2() + } +} + +func (g *RestGateway) checkCO2() { + rooms := g.mapping.Rooms() + for _, roomID := range rooms { + nodes := g.mapping.NodesForRoom(roomID) + if len(nodes) == 0 { + continue + } + nodeFilter := buildNodeFilter(nodes) + + // Get the last record for the specific room by matching node IDs, aggregated by 5m intervals + // Same logic as getRoomCurrent + query := fmt.Sprintf(` + SELECT + date_bin(INTERVAL '5 minutes', time)::TIMESTAMP AS time, + ROUND(AVG(co2_ppm)) AS co2_ppm + FROM "%s" + WHERE time > now() - INTERVAL '1 day' + AND %s + GROUP BY date_bin(INTERVAL '5 minutes', time) + ORDER BY time ASC + LIMIT 1 + `, g.measurementName, nodeFilter, + ) + + it, err := g.influxGateway.Query(context.Background(), query) + if err != nil { + log.Printf("[Watchdog] Error querying CO2 for room %s: %v\n", roomID, err) + continue + } + + if it.Next() { + val := it.Value() + if co2Val, ok := val["co2_ppm"]; ok { + co2 := int(co2Val.(float64)) + + g.statusMu.Lock() + status, ok := g.roomStatus[roomID] + if !ok { + status = &RoomCO2Status{RoomID: roomID} + g.roomStatus[roomID] = status + } + status.CO2 = co2 + + if co2 > g.co2ThresholdMax { + status.IsHigh = true + } else if co2 < g.co2ThresholdMin { + status.IsHigh = false + } + g.statusMu.Unlock() + } + } + if err := it.Err(); err != nil { + log.Printf("[Watchdog] Error in iterator for room %s: %v\n", roomID, err) + } + } +}