diff --git a/db/mapping.json.template b/db/mapping.json.template new file mode 100644 index 0000000..b318277 --- /dev/null +++ b/db/mapping.json.template @@ -0,0 +1,8 @@ +{ + "campus": { + "provence": ["gw-01"] + }, + "room": { + "B3": ["E1:C0:30:15:4E:89", "F9:CE:0C:A4:7C:A4"] + } +} diff --git a/db/src/main.go b/db/src/main.go index f56c74f..f6762da 100644 --- a/db/src/main.go +++ b/db/src/main.go @@ -94,6 +94,16 @@ func influxConnection() *influx.InfluxGateway { } func main() { + // Load mapping configuration + mappingPath := os.Getenv("MAPPING_CONFIG_PATH") + if mappingPath == "" { + mappingPath = "mapping.json" + } + mapping, err := LoadMapping(mappingPath) + if err != nil { + log.Printf("[Main] Warning: could not load mapping file: %v. Using defaults.", err) + mapping = EmptyMapping() + } mqttGateway := mqttConnection() defer mqttGateway.Disconnect() @@ -101,13 +111,44 @@ func main() { influxGateway := influxConnection() defer influxGateway.Close() - // Create measurement for provence topic - provenceMeasurement := point.CreateMeasurement[ProvenceData]("provence") + measurementName := os.Getenv("CAMPUS") + if measurementName == "" { + measurementName = "provence" + } - tagSubjects := []string{"city", "room"} - err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, tagSubjects, func(dp point.DataPoint[ProvenceData]) { - log.Printf("Received dp: %+v\n", dp) - if err := influxGateway.AddDatapoint(&dp); err != nil { + // Create measurement for provence topic + provenceMeasurement := point.CreateMeasurement[ProvenceData](measurementName) + // The incoming MQTT topic structure is: //update + topicStructur := []string{"gateway", "node"} + err = mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, topicStructur, func(dp point.DataPoint[ProvenceData]) { + var gatewayID, nodeID string + for _, tag := range dp.Tags() { + switch tag.Subject { + case "gateway": + gatewayID = tag.Content + case "node": + nodeID = tag.Content + } + } + + campus, campusOk := mapping.GetCampus(gatewayID) + room, roomOk := mapping.GetRoom(nodeID) + + if !campusOk || !roomOk { + log.Printf("[Main] No mapping found for gateway=%s node=%s, dropping datapoint\n", gatewayID, nodeID) + return + } + + log.Printf("[Main] Received gateway=%s node=%s -> campus=%s room=%s\n", gatewayID, nodeID, campus, room) + + // Re-create the datapoint with campus/room tags for Influx + influxTags := []point.Topic{ + {Subject: "campus", Content: campus}, + {Subject: "room", Content: room}, + } + translatedDp := provenceMeasurement.CreateDataPoint(influxTags, dp.GetValues(), dp.Timestamp()) + + if err := influxGateway.AddDatapoint(&translatedDp); err != nil { log.Printf("[Main] Error adding datapoint to influx: %v\n", err) } if err := influxGateway.Flush(); err != nil { @@ -119,7 +160,7 @@ func main() { } // Initialize and start REST Gateway - restGateway := rest.NewRestGateway(influxGateway) + restGateway := rest.NewRestGateway(influxGateway, measurementName) port, ok := os.LookupEnv("REST_PORT") if !ok { diff --git a/db/src/mapping.go b/db/src/mapping.go new file mode 100644 index 0000000..2e0ceac --- /dev/null +++ b/db/src/mapping.go @@ -0,0 +1,80 @@ +package main + +import ( + "encoding/json" + "fmt" + "os" +) + +// mappingFile is the structure of the JSON config file. +// Campus names map to the list of gateway IDs that belong to them. +// Room names map to the list of node IDs that belong to them. +type mappingFile struct { + Campus map[string][]string `json:"campus"` + Room map[string][]string `json:"room"` +} + +// MappingConfig holds the reverse lookup maps built from the config file: +// +// gateway_id -> campus name +// node_id -> room name +type MappingConfig struct { + gatewayToCampus map[string]string + nodeToRoom map[string]string +} + +// EmptyMapping returns a MappingConfig with no entries. +// GetCampus and GetRoom will return their "unknown_*" fallback values. +func EmptyMapping() *MappingConfig { + return &MappingConfig{ + gatewayToCampus: make(map[string]string), + nodeToRoom: make(map[string]string), + } +} + +// LoadMapping reads the mapping configuration from a JSON file and builds +// the internal reverse-lookup tables. +func LoadMapping(path string) (*MappingConfig, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read mapping file %q: %w", path, err) + } + + var raw mappingFile + if err := json.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("failed to parse mapping JSON: %w", err) + } + + cfg := &MappingConfig{ + gatewayToCampus: make(map[string]string), + nodeToRoom: make(map[string]string), + } + + for campus, gateways := range raw.Campus { + for _, gwID := range gateways { + cfg.gatewayToCampus[gwID] = campus + } + } + + for room, nodes := range raw.Room { + for _, nodeID := range nodes { + cfg.nodeToRoom[nodeID] = room + } + } + + return cfg, nil +} + +// GetCampus returns the campus name for a given gateway ID. +// The boolean is false if the gateway ID has no mapping. +func (c *MappingConfig) GetCampus(gatewayID string) (string, bool) { + campus, ok := c.gatewayToCampus[gatewayID] + return campus, ok +} + +// GetRoom returns the room name for a given node ID. +// The boolean is false if the node ID has no mapping. +func (c *MappingConfig) GetRoom(nodeID string) (string, bool) { + room, ok := c.nodeToRoom[nodeID] + return room, ok +} diff --git a/db/src/rest/rest.go b/db/src/rest/rest.go index 5cd9b5d..f8bf2bc 100644 --- a/db/src/rest/rest.go +++ b/db/src/rest/rest.go @@ -10,14 +10,16 @@ import ( ) type RestGateway struct { - influxGateway *influx.InfluxGateway - engine *gin.Engine + influxGateway *influx.InfluxGateway + engine *gin.Engine + measurementName string } -func NewRestGateway(influxGateway *influx.InfluxGateway) *RestGateway { +func NewRestGateway(influxGateway *influx.InfluxGateway, measurementName string) *RestGateway { g := &RestGateway{ - influxGateway: influxGateway, - engine: gin.Default(), + influxGateway: influxGateway, + engine: gin.Default(), + measurementName: measurementName, } g.setupRoutes() @@ -39,8 +41,8 @@ func (g *RestGateway) Run(addr string) error { // GET /api/v1/rooms func (g *RestGateway) getRooms(c *gin.Context) { - // Query unique rooms from the "provence" measurement - query := `SELECT DISTINCT("room") FROM "provence"` + // Query unique rooms from the measurement + query := fmt.Sprintf(`SELECT DISTINCT("room") FROM "%s"`, g.measurementName) // Using context.Background() as seen in working snippet it, err := g.influxGateway.Query(context.Background(), query) @@ -70,7 +72,7 @@ func (g *RestGateway) getRoomCurrent(c *gin.Context) { roomID := c.Param("room-id") // Get the last record for the specific room - query := fmt.Sprintf(`SELECT * FROM "provence" WHERE "room" = '%s' ORDER BY time DESC LIMIT 1`, roomID) + query := fmt.Sprintf(`SELECT * FROM "%s" WHERE "room" = '%s' ORDER BY time DESC LIMIT 1`, g.measurementName, roomID) // Using context.Background() as seen in working snippet it, err := g.influxGateway.Query(context.Background(), query) @@ -97,8 +99,8 @@ func (g *RestGateway) getRoomHistory(c *gin.Context) { roomID := c.Param("room-id") // Default to last 24 hours if not specified - query := fmt.Sprintf(` SELECT * FROM "provence" WHERE "room" = '%s' AND time > now() - INTERVAL '1 day' ORDER BY time ASC -`, roomID) + query := fmt.Sprintf(` SELECT * FROM "%s" WHERE "room" = '%s' AND time > now() - INTERVAL '1 day' ORDER BY time ASC +`, g.measurementName, roomID) // Using context.Background() as seen in working snippet it, err := g.influxGateway.Query(context.Background(), query)