feat(db): add mapping
MQTT topic gateway-id/node-id is now mapped to campus/room for influx Assisted-by: Junie:gemini-3-flash Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
8
db/mapping.json.template
Normal file
8
db/mapping.json.template
Normal file
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"campus": {
|
||||
"provence": ["gw-01"]
|
||||
},
|
||||
"room": {
|
||||
"B3": ["E1:C0:30:15:4E:89", "F9:CE:0C:A4:7C:A4"]
|
||||
}
|
||||
}
|
||||
@@ -94,6 +94,16 @@ func influxConnection() *influx.InfluxGateway {
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Load mapping configuration
|
||||
mappingPath := os.Getenv("MAPPING_CONFIG_PATH")
|
||||
if mappingPath == "" {
|
||||
mappingPath = "mapping.json"
|
||||
}
|
||||
mapping, err := LoadMapping(mappingPath)
|
||||
if err != nil {
|
||||
log.Printf("[Main] Warning: could not load mapping file: %v. Using defaults.", err)
|
||||
mapping = EmptyMapping()
|
||||
}
|
||||
|
||||
mqttGateway := mqttConnection()
|
||||
defer mqttGateway.Disconnect()
|
||||
@@ -101,13 +111,44 @@ func main() {
|
||||
influxGateway := influxConnection()
|
||||
defer influxGateway.Close()
|
||||
|
||||
// Create measurement for provence topic
|
||||
provenceMeasurement := point.CreateMeasurement[ProvenceData]("provence")
|
||||
measurementName := os.Getenv("CAMPUS")
|
||||
if measurementName == "" {
|
||||
measurementName = "provence"
|
||||
}
|
||||
|
||||
tagSubjects := []string{"city", "room"}
|
||||
err := mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, tagSubjects, func(dp point.DataPoint[ProvenceData]) {
|
||||
log.Printf("Received dp: %+v\n", dp)
|
||||
if err := influxGateway.AddDatapoint(&dp); err != nil {
|
||||
// Create measurement for provence topic
|
||||
provenceMeasurement := point.CreateMeasurement[ProvenceData](measurementName)
|
||||
// The incoming MQTT topic structure is: <gateway_id>/<node_id>/update
|
||||
topicStructur := []string{"gateway", "node"}
|
||||
err = mqtt.SubscribeTyped(mqttGateway, "+/+/update", provenceMeasurement, topicStructur, func(dp point.DataPoint[ProvenceData]) {
|
||||
var gatewayID, nodeID string
|
||||
for _, tag := range dp.Tags() {
|
||||
switch tag.Subject {
|
||||
case "gateway":
|
||||
gatewayID = tag.Content
|
||||
case "node":
|
||||
nodeID = tag.Content
|
||||
}
|
||||
}
|
||||
|
||||
campus, campusOk := mapping.GetCampus(gatewayID)
|
||||
room, roomOk := mapping.GetRoom(nodeID)
|
||||
|
||||
if !campusOk || !roomOk {
|
||||
log.Printf("[Main] No mapping found for gateway=%s node=%s, dropping datapoint\n", gatewayID, nodeID)
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("[Main] Received gateway=%s node=%s -> campus=%s room=%s\n", gatewayID, nodeID, campus, room)
|
||||
|
||||
// Re-create the datapoint with campus/room tags for Influx
|
||||
influxTags := []point.Topic{
|
||||
{Subject: "campus", Content: campus},
|
||||
{Subject: "room", Content: room},
|
||||
}
|
||||
translatedDp := provenceMeasurement.CreateDataPoint(influxTags, dp.GetValues(), dp.Timestamp())
|
||||
|
||||
if err := influxGateway.AddDatapoint(&translatedDp); err != nil {
|
||||
log.Printf("[Main] Error adding datapoint to influx: %v\n", err)
|
||||
}
|
||||
if err := influxGateway.Flush(); err != nil {
|
||||
@@ -119,7 +160,7 @@ func main() {
|
||||
}
|
||||
|
||||
// Initialize and start REST Gateway
|
||||
restGateway := rest.NewRestGateway(influxGateway)
|
||||
restGateway := rest.NewRestGateway(influxGateway, measurementName)
|
||||
|
||||
port, ok := os.LookupEnv("REST_PORT")
|
||||
if !ok {
|
||||
|
||||
80
db/src/mapping.go
Normal file
80
db/src/mapping.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
// mappingFile is the structure of the JSON config file.
|
||||
// Campus names map to the list of gateway IDs that belong to them.
|
||||
// Room names map to the list of node IDs that belong to them.
|
||||
type mappingFile struct {
|
||||
Campus map[string][]string `json:"campus"`
|
||||
Room map[string][]string `json:"room"`
|
||||
}
|
||||
|
||||
// MappingConfig holds the reverse lookup maps built from the config file:
|
||||
//
|
||||
// gateway_id -> campus name
|
||||
// node_id -> room name
|
||||
type MappingConfig struct {
|
||||
gatewayToCampus map[string]string
|
||||
nodeToRoom map[string]string
|
||||
}
|
||||
|
||||
// EmptyMapping returns a MappingConfig with no entries.
|
||||
// GetCampus and GetRoom will return their "unknown_*" fallback values.
|
||||
func EmptyMapping() *MappingConfig {
|
||||
return &MappingConfig{
|
||||
gatewayToCampus: make(map[string]string),
|
||||
nodeToRoom: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
// LoadMapping reads the mapping configuration from a JSON file and builds
|
||||
// the internal reverse-lookup tables.
|
||||
func LoadMapping(path string) (*MappingConfig, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read mapping file %q: %w", path, err)
|
||||
}
|
||||
|
||||
var raw mappingFile
|
||||
if err := json.Unmarshal(data, &raw); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse mapping JSON: %w", err)
|
||||
}
|
||||
|
||||
cfg := &MappingConfig{
|
||||
gatewayToCampus: make(map[string]string),
|
||||
nodeToRoom: make(map[string]string),
|
||||
}
|
||||
|
||||
for campus, gateways := range raw.Campus {
|
||||
for _, gwID := range gateways {
|
||||
cfg.gatewayToCampus[gwID] = campus
|
||||
}
|
||||
}
|
||||
|
||||
for room, nodes := range raw.Room {
|
||||
for _, nodeID := range nodes {
|
||||
cfg.nodeToRoom[nodeID] = room
|
||||
}
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// GetCampus returns the campus name for a given gateway ID.
|
||||
// The boolean is false if the gateway ID has no mapping.
|
||||
func (c *MappingConfig) GetCampus(gatewayID string) (string, bool) {
|
||||
campus, ok := c.gatewayToCampus[gatewayID]
|
||||
return campus, ok
|
||||
}
|
||||
|
||||
// GetRoom returns the room name for a given node ID.
|
||||
// The boolean is false if the node ID has no mapping.
|
||||
func (c *MappingConfig) GetRoom(nodeID string) (string, bool) {
|
||||
room, ok := c.nodeToRoom[nodeID]
|
||||
return room, ok
|
||||
}
|
||||
@@ -10,14 +10,16 @@ import (
|
||||
)
|
||||
|
||||
type RestGateway struct {
|
||||
influxGateway *influx.InfluxGateway
|
||||
engine *gin.Engine
|
||||
influxGateway *influx.InfluxGateway
|
||||
engine *gin.Engine
|
||||
measurementName string
|
||||
}
|
||||
|
||||
func NewRestGateway(influxGateway *influx.InfluxGateway) *RestGateway {
|
||||
func NewRestGateway(influxGateway *influx.InfluxGateway, measurementName string) *RestGateway {
|
||||
g := &RestGateway{
|
||||
influxGateway: influxGateway,
|
||||
engine: gin.Default(),
|
||||
influxGateway: influxGateway,
|
||||
engine: gin.Default(),
|
||||
measurementName: measurementName,
|
||||
}
|
||||
|
||||
g.setupRoutes()
|
||||
@@ -39,8 +41,8 @@ func (g *RestGateway) Run(addr string) error {
|
||||
|
||||
// GET /api/v1/rooms
|
||||
func (g *RestGateway) getRooms(c *gin.Context) {
|
||||
// Query unique rooms from the "provence" measurement
|
||||
query := `SELECT DISTINCT("room") FROM "provence"`
|
||||
// Query unique rooms from the measurement
|
||||
query := fmt.Sprintf(`SELECT DISTINCT("room") FROM "%s"`, g.measurementName)
|
||||
|
||||
// Using context.Background() as seen in working snippet
|
||||
it, err := g.influxGateway.Query(context.Background(), query)
|
||||
@@ -70,7 +72,7 @@ func (g *RestGateway) getRoomCurrent(c *gin.Context) {
|
||||
roomID := c.Param("room-id")
|
||||
|
||||
// Get the last record for the specific room
|
||||
query := fmt.Sprintf(`SELECT * FROM "provence" WHERE "room" = '%s' ORDER BY time DESC LIMIT 1`, roomID)
|
||||
query := fmt.Sprintf(`SELECT * FROM "%s" WHERE "room" = '%s' ORDER BY time DESC LIMIT 1`, g.measurementName, roomID)
|
||||
|
||||
// Using context.Background() as seen in working snippet
|
||||
it, err := g.influxGateway.Query(context.Background(), query)
|
||||
@@ -97,8 +99,8 @@ func (g *RestGateway) getRoomHistory(c *gin.Context) {
|
||||
roomID := c.Param("room-id")
|
||||
|
||||
// Default to last 24 hours if not specified
|
||||
query := fmt.Sprintf(` SELECT * FROM "provence" WHERE "room" = '%s' AND time > now() - INTERVAL '1 day' ORDER BY time ASC
|
||||
`, roomID)
|
||||
query := fmt.Sprintf(` SELECT * FROM "%s" WHERE "room" = '%s' AND time > now() - INTERVAL '1 day' ORDER BY time ASC
|
||||
`, g.measurementName, roomID)
|
||||
|
||||
// Using context.Background() as seen in working snippet
|
||||
it, err := g.influxGateway.Query(context.Background(), query)
|
||||
|
||||
Reference in New Issue
Block a user