feat(db): merge datapoint and message

Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
2026-04-30 13:27:44 +02:00
parent 1fb294f495
commit 979c502e27
5 changed files with 74 additions and 70 deletions

37
db/src/go.mod Normal file
View File

@@ -0,0 +1,37 @@
module gateway
go 1.24.0
require (
github.com/InfluxCommunity/influxdb3-go/v2 v2.13.0
github.com/eclipse/paho.mqtt.golang v1.5.1
github.com/stretchr/testify v1.11.1
)
require (
github.com/apache/arrow-go/v18 v18.5.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/google/flatbuffers v25.12.19+incompatible // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
github.com/klauspost/compress v1.18.2 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/pierrec/lz4/v4 v4.1.23 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect
golang.org/x/mod v0.32.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.40.0 // indirect
golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2 // indirect
golang.org/x/text v0.33.0 // indirect
golang.org/x/tools v0.41.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/grpc v1.79.1 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -55,12 +55,16 @@ func NewInfluxGateway(url string, token string, database string) (*InfluxGateway
// It pushes the batch of point when the number of points >= batch size.
func (g *InfluxGateway) AddDatapoint(dp datapoint.DataPointInfo) error {
tagsList := map[string]string{}
for _, t := range dp.Tags() {
tagsList[t.Subject] = t.Content
}
g.batcher.Add(
influxdb3.NewPoint(
dp.GetMeasurementName(),
dp.GetTags(),
dp.GetValuesAsAny(),
dp.GetTimestamp(),
dp.MeasurementName(),
tagsList,
dp.PayloadAsAny(),
dp.Timestamp(),
),
)

View File

@@ -6,8 +6,9 @@ import (
"encoding/json"
"errors"
"fmt"
data "gateway/point"
dp "gateway/point"
"log"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
@@ -48,6 +49,14 @@ var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err
log.Printf("[MQTT Gateway] Connection lost: %v\n", err)
}
func getTopic(t []dp.Topic) string {
var topic []string
for _, t := range t {
topic = append(topic, t.Content)
}
return strings.Join(topic, "/")
}
// NewMqttGateway creates a new MqttGateway with the given parameters.
// And establishes the connection
func NewMqttGateway(p MqttParams) (*MqttGateway, error) {
@@ -111,8 +120,9 @@ func NewMqttGateway(p MqttParams) (*MqttGateway, error) {
// SendData is used to send data in the MQTT gateway.
// It uses the DataPointInfo interface for abstracting the generic type of the DataPoint
func (g *MqttGateway) SendData(msg data.MessageInfo) error {
topic := msg.Topic()
func (g *MqttGateway) SendData(msg dp.DataPointInfo) error {
topic := getTopic(msg.Tags())
if topic == "" {
return errors.New("[MQTT Gateway] Invalid topic")
}

View File

@@ -2,15 +2,22 @@
// for database gateways.
package datapoint
import "time"
import (
"time"
)
type Topic struct {
Subject string
Content string
}
// DataPointInfo provides an interface for accessing
// all fields of a DataPoint.
type DataPointInfo interface {
GetMeasurementName() string
GetTags() map[string]string
GetValuesAsAny() map[string]any
GetTimestamp() time.Time
MeasurementName() string
Tags() []Topic
PayloadAsAny() map[string]any
Timestamp() time.Time
}
// A Measurement represents a type of measurement such as
@@ -23,7 +30,7 @@ type Measurement[T any] struct {
// values, and a timestamp. It contains a pointer to its parent Measurement
type DataPoint[T any] struct {
measurement *Measurement[T]
tags map[string]string
tags []Topic
values map[string]T
timestamp time.Time
}
@@ -36,7 +43,7 @@ func CreateMeasurement[T any](name string) Measurement[T] {
// CreateDataPoint forces the creation of DataPoint with the type according
// to the generic.
func (m *Measurement[T]) CreateDataPoint(tags map[string]string, values map[string]T, timestamp time.Time) DataPoint[T] {
func (m *Measurement[T]) CreateDataPoint(tags []Topic, values map[string]T, timestamp time.Time) DataPoint[T] {
return DataPoint[T]{
measurement: m,
tags: tags,
@@ -45,11 +52,11 @@ func (m *Measurement[T]) CreateDataPoint(tags map[string]string, values map[stri
}
}
func (dp *DataPoint[T]) GetMeasurementName() string {
func (dp *DataPoint[T]) MeasurementName() string {
return dp.measurement.name
}
func (dp *DataPoint[T]) GetTags() map[string]string {
func (dp *DataPoint[T]) Tags() []Topic {
return dp.tags
}

View File

@@ -1,54 +0,0 @@
package datapoint
import (
"strings"
"time"
)
// MessageInfo is an interface for accessing all fields of a Message.
type MessageInfo interface {
Topic() string
TopicParts() []string
Timestamp() time.Time
PayloadAsAny() map[string]any
}
// Message represents an MQTT message with an ordered topic path.
type Message[T any] struct {
topicParts []string
payload map[string]T
timestamp time.Time
}
// CreateMessage creates a new Message with the given topic parts, payload and timestamp.
func CreateMessage[T any](topicParts []string, payload map[string]T, timestamp time.Time) Message[T] {
return Message[T]{
topicParts: topicParts,
payload: payload,
timestamp: timestamp,
}
}
func (m *Message[T]) Topic() string {
return strings.Join(m.topicParts, "/")
}
func (m *Message[T]) TopicParts() []string {
return m.topicParts
}
func (m *Message[T]) Payload() map[string]T {
return m.payload
}
func (m *Message[T]) Timestamp() time.Time {
return m.timestamp
}
func (m *Message[T]) PayloadAsAny() map[string]any {
anyPayload := make(map[string]any, len(m.payload))
for k, v := range m.payload {
anyPayload[k] = v
}
return anyPayload
}