diff --git a/db/src/mqtt/mqtt.go b/db/src/mqtt/mqtt.go index fbca826..05d8e46 100644 --- a/db/src/mqtt/mqtt.go +++ b/db/src/mqtt/mqtt.go @@ -187,3 +187,57 @@ func (g *MqttGateway) Unsubscribe(topic string) error { log.Printf("[MQTT Gateway] Unsubscribed from topic: %s\n", topic) return nil } + +// SubscribeTyped is a helper to subscribe to a topic and automatically convert +// the received JSON message to a DataPoint of type T. +// T should be a struct or a map that matches the JSON payload (excluding timestamp). +// tagSubjects is a list of tag subjects that correspond to the parts of the topic. +// For example, if the topic is "provence/B3/update" and tagSubjects is ["city", "room"], +// it will create tags {Subject: "city", Content: "provence"} and {Subject: "room", Content: "B3"}. +func SubscribeTyped[T any](g *MqttGateway, topic string, m dp.Measurement[T], tagSubjects []string, handler func(dp.DataPoint[T])) error { + return g.Subscribe(topic, func(client mqtt.Client, msg mqtt.Message) { + // Unmarshal into T for fields + var fields T + if err := json.Unmarshal(msg.Payload(), &fields); err != nil { + log.Printf("[MQTT Gateway] Error unmarshaling fields: %v", err) + return + } + + // Unmarshal into a map to extract timestamp + var raw map[string]any + if err := json.Unmarshal(msg.Payload(), &raw); err != nil { + log.Printf("[MQTT Gateway] Error unmarshaling raw: %v", err) + return + } + + ts := time.Now() + if tsRaw, ok := raw["timestamp"].(string); ok { + // Try RFC3339 first (default) + if parsedTs, err := time.Parse(time.RFC3339, tsRaw); err == nil { + ts = parsedTs + } else { + log.Printf("[MQTT Gateway] Failed to parse timestamp '%s' as RFC3339: %v", tsRaw, err) + } + } else if tsRaw, ok := raw["timestamp"].(float64); ok { + // Handle Unix timestamp in seconds + ts = time.Unix(int64(tsRaw), 0) + } + + // Extract tags from topic + parts := strings.Split(msg.Topic(), "/") + var tags []dp.Topic + for i, subject := range tagSubjects { + if i < len(parts) && subject != "" { + tags = append(tags, dp.Topic{Subject: subject, Content: parts[i]}) + } + } + + // Fallback for backward compatibility if no tagSubjects provided + if len(tagSubjects) == 0 && len(parts) > 1 { + tags = append(tags, dp.Topic{Subject: "id", Content: parts[1]}) + } + + dp := m.CreateDataPoint(tags, fields, ts) + handler(dp) + }) +} diff --git a/db/src/point/datapoint.go b/db/src/point/datapoint.go index 599c01d..03c407e 100644 --- a/db/src/point/datapoint.go +++ b/db/src/point/datapoint.go @@ -3,6 +3,7 @@ package datapoint import ( + "encoding/json" "time" ) @@ -31,7 +32,7 @@ type Measurement[T any] struct { type DataPoint[T any] struct { measurement *Measurement[T] tags []Topic - values map[string]T + values T timestamp time.Time } @@ -43,7 +44,7 @@ func CreateMeasurement[T any](name string) Measurement[T] { // CreateDataPoint forces the creation of DataPoint with the type according // to the generic. -func (m *Measurement[T]) CreateDataPoint(tags []Topic, values map[string]T, timestamp time.Time) DataPoint[T] { +func (m *Measurement[T]) CreateDataPoint(tags []Topic, values T, timestamp time.Time) DataPoint[T] { return DataPoint[T]{ measurement: m, tags: tags, @@ -60,21 +61,22 @@ func (dp *DataPoint[T]) Tags() []Topic { return dp.tags } -func (dp *DataPoint[T]) GetValues() map[string]T { +func (dp *DataPoint[T]) GetValues() T { return dp.values } -// GetValuesAsAny returns the DataPoint value with type any. +// PayloadAsAny returns the DataPoint value with type any. // (useful for adding the DataPoint for batching) -func (dp *DataPoint[T]) GetValuesAsAny() map[string]any { - anyValues := make(map[string]any) - - for k, v := range dp.values { - anyValues[k] = v +func (dp *DataPoint[T]) PayloadAsAny() map[string]any { + b, err := json.Marshal(dp.values) + if err != nil { + return nil } - return anyValues + var res map[string]any + _ = json.Unmarshal(b, &res) + return res } -func (dp *DataPoint[T]) GetTimestamp() time.Time { +func (dp *DataPoint[T]) Timestamp() time.Time { return dp.timestamp }