feat(db): add SubscribeTyped function and refactor DataPoint structure
Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
@@ -187,3 +187,57 @@ func (g *MqttGateway) Unsubscribe(topic string) error {
|
||||
log.Printf("[MQTT Gateway] Unsubscribed from topic: %s\n", topic)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SubscribeTyped is a helper to subscribe to a topic and automatically convert
|
||||
// the received JSON message to a DataPoint of type T.
|
||||
// T should be a struct or a map that matches the JSON payload (excluding timestamp).
|
||||
// tagSubjects is a list of tag subjects that correspond to the parts of the topic.
|
||||
// For example, if the topic is "provence/B3/update" and tagSubjects is ["city", "room"],
|
||||
// it will create tags {Subject: "city", Content: "provence"} and {Subject: "room", Content: "B3"}.
|
||||
func SubscribeTyped[T any](g *MqttGateway, topic string, m dp.Measurement[T], tagSubjects []string, handler func(dp.DataPoint[T])) error {
|
||||
return g.Subscribe(topic, func(client mqtt.Client, msg mqtt.Message) {
|
||||
// Unmarshal into T for fields
|
||||
var fields T
|
||||
if err := json.Unmarshal(msg.Payload(), &fields); err != nil {
|
||||
log.Printf("[MQTT Gateway] Error unmarshaling fields: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Unmarshal into a map to extract timestamp
|
||||
var raw map[string]any
|
||||
if err := json.Unmarshal(msg.Payload(), &raw); err != nil {
|
||||
log.Printf("[MQTT Gateway] Error unmarshaling raw: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
ts := time.Now()
|
||||
if tsRaw, ok := raw["timestamp"].(string); ok {
|
||||
// Try RFC3339 first (default)
|
||||
if parsedTs, err := time.Parse(time.RFC3339, tsRaw); err == nil {
|
||||
ts = parsedTs
|
||||
} else {
|
||||
log.Printf("[MQTT Gateway] Failed to parse timestamp '%s' as RFC3339: %v", tsRaw, err)
|
||||
}
|
||||
} else if tsRaw, ok := raw["timestamp"].(float64); ok {
|
||||
// Handle Unix timestamp in seconds
|
||||
ts = time.Unix(int64(tsRaw), 0)
|
||||
}
|
||||
|
||||
// Extract tags from topic
|
||||
parts := strings.Split(msg.Topic(), "/")
|
||||
var tags []dp.Topic
|
||||
for i, subject := range tagSubjects {
|
||||
if i < len(parts) && subject != "" {
|
||||
tags = append(tags, dp.Topic{Subject: subject, Content: parts[i]})
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback for backward compatibility if no tagSubjects provided
|
||||
if len(tagSubjects) == 0 && len(parts) > 1 {
|
||||
tags = append(tags, dp.Topic{Subject: "id", Content: parts[1]})
|
||||
}
|
||||
|
||||
dp := m.CreateDataPoint(tags, fields, ts)
|
||||
handler(dp)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
package datapoint
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -31,7 +32,7 @@ type Measurement[T any] struct {
|
||||
type DataPoint[T any] struct {
|
||||
measurement *Measurement[T]
|
||||
tags []Topic
|
||||
values map[string]T
|
||||
values T
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
@@ -43,7 +44,7 @@ func CreateMeasurement[T any](name string) Measurement[T] {
|
||||
|
||||
// CreateDataPoint forces the creation of DataPoint with the type according
|
||||
// to the generic.
|
||||
func (m *Measurement[T]) CreateDataPoint(tags []Topic, values map[string]T, timestamp time.Time) DataPoint[T] {
|
||||
func (m *Measurement[T]) CreateDataPoint(tags []Topic, values T, timestamp time.Time) DataPoint[T] {
|
||||
return DataPoint[T]{
|
||||
measurement: m,
|
||||
tags: tags,
|
||||
@@ -60,21 +61,22 @@ func (dp *DataPoint[T]) Tags() []Topic {
|
||||
return dp.tags
|
||||
}
|
||||
|
||||
func (dp *DataPoint[T]) GetValues() map[string]T {
|
||||
func (dp *DataPoint[T]) GetValues() T {
|
||||
return dp.values
|
||||
}
|
||||
|
||||
// GetValuesAsAny returns the DataPoint value with type any.
|
||||
// PayloadAsAny returns the DataPoint value with type any.
|
||||
// (useful for adding the DataPoint for batching)
|
||||
func (dp *DataPoint[T]) GetValuesAsAny() map[string]any {
|
||||
anyValues := make(map[string]any)
|
||||
|
||||
for k, v := range dp.values {
|
||||
anyValues[k] = v
|
||||
func (dp *DataPoint[T]) PayloadAsAny() map[string]any {
|
||||
b, err := json.Marshal(dp.values)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return anyValues
|
||||
var res map[string]any
|
||||
_ = json.Unmarshal(b, &res)
|
||||
return res
|
||||
}
|
||||
|
||||
func (dp *DataPoint[T]) GetTimestamp() time.Time {
|
||||
func (dp *DataPoint[T]) Timestamp() time.Time {
|
||||
return dp.timestamp
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user