diff --git a/db/src/go.mod b/db/src/go.mod new file mode 100644 index 0000000..6a4804c --- /dev/null +++ b/db/src/go.mod @@ -0,0 +1,37 @@ +module gateway + +go 1.24.0 + +require ( + github.com/InfluxCommunity/influxdb3-go/v2 v2.13.0 + github.com/eclipse/paho.mqtt.golang v1.5.1 + github.com/stretchr/testify v1.11.1 +) + +require ( + github.com/apache/arrow-go/v18 v18.5.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/goccy/go-json v0.10.5 // indirect + github.com/google/flatbuffers v25.12.19+incompatible // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/influxdata/line-protocol/v2 v2.2.1 // indirect + github.com/klauspost/compress v1.18.2 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect + github.com/pierrec/lz4/v4 v4.1.23 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect + golang.org/x/mod v0.32.0 // indirect + golang.org/x/net v0.49.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2 // indirect + golang.org/x/text v0.33.0 // indirect + golang.org/x/tools v0.41.0 // indirect + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect + google.golang.org/grpc v1.79.1 // indirect + google.golang.org/protobuf v1.36.11 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/db/src/influx/influx_gateway.go b/db/src/influx/influx_gateway.go index b2d288e..edd193b 100644 --- a/db/src/influx/influx_gateway.go +++ b/db/src/influx/influx_gateway.go @@ -55,12 +55,16 @@ func NewInfluxGateway(url string, token string, database string) (*InfluxGateway // It pushes the batch of point when the number of points >= batch size. func (g *InfluxGateway) AddDatapoint(dp datapoint.DataPointInfo) error { + tagsList := map[string]string{} + for _, t := range dp.Tags() { + tagsList[t.Subject] = t.Content + } g.batcher.Add( influxdb3.NewPoint( - dp.GetMeasurementName(), - dp.GetTags(), - dp.GetValuesAsAny(), - dp.GetTimestamp(), + dp.MeasurementName(), + tagsList, + dp.PayloadAsAny(), + dp.Timestamp(), ), ) diff --git a/db/src/mqtt/mqtt_gateway.go b/db/src/mqtt/mqtt_gateway.go index 994c5c7..631d213 100644 --- a/db/src/mqtt/mqtt_gateway.go +++ b/db/src/mqtt/mqtt_gateway.go @@ -6,8 +6,9 @@ import ( "encoding/json" "errors" "fmt" - data "gateway/point" + dp "gateway/point" "log" + "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -48,6 +49,14 @@ var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err log.Printf("[MQTT Gateway] Connection lost: %v\n", err) } +func getTopic(t []dp.Topic) string { + var topic []string + for _, t := range t { + topic = append(topic, t.Content) + } + return strings.Join(topic, "/") +} + // NewMqttGateway creates a new MqttGateway with the given parameters. // And establishes the connection func NewMqttGateway(p MqttParams) (*MqttGateway, error) { @@ -111,8 +120,9 @@ func NewMqttGateway(p MqttParams) (*MqttGateway, error) { // SendData is used to send data in the MQTT gateway. // It uses the DataPointInfo interface for abstracting the generic type of the DataPoint -func (g *MqttGateway) SendData(msg data.MessageInfo) error { - topic := msg.Topic() +func (g *MqttGateway) SendData(msg dp.DataPointInfo) error { + + topic := getTopic(msg.Tags()) if topic == "" { return errors.New("[MQTT Gateway] Invalid topic") } diff --git a/db/src/point/datapoint.go b/db/src/point/datapoint.go index 4817f36..599c01d 100644 --- a/db/src/point/datapoint.go +++ b/db/src/point/datapoint.go @@ -2,15 +2,22 @@ // for database gateways. package datapoint -import "time" +import ( + "time" +) + +type Topic struct { + Subject string + Content string +} // DataPointInfo provides an interface for accessing // all fields of a DataPoint. type DataPointInfo interface { - GetMeasurementName() string - GetTags() map[string]string - GetValuesAsAny() map[string]any - GetTimestamp() time.Time + MeasurementName() string + Tags() []Topic + PayloadAsAny() map[string]any + Timestamp() time.Time } // A Measurement represents a type of measurement such as @@ -23,7 +30,7 @@ type Measurement[T any] struct { // values, and a timestamp. It contains a pointer to its parent Measurement type DataPoint[T any] struct { measurement *Measurement[T] - tags map[string]string + tags []Topic values map[string]T timestamp time.Time } @@ -36,7 +43,7 @@ func CreateMeasurement[T any](name string) Measurement[T] { // CreateDataPoint forces the creation of DataPoint with the type according // to the generic. -func (m *Measurement[T]) CreateDataPoint(tags map[string]string, values map[string]T, timestamp time.Time) DataPoint[T] { +func (m *Measurement[T]) CreateDataPoint(tags []Topic, values map[string]T, timestamp time.Time) DataPoint[T] { return DataPoint[T]{ measurement: m, tags: tags, @@ -45,11 +52,11 @@ func (m *Measurement[T]) CreateDataPoint(tags map[string]string, values map[stri } } -func (dp *DataPoint[T]) GetMeasurementName() string { +func (dp *DataPoint[T]) MeasurementName() string { return dp.measurement.name } -func (dp *DataPoint[T]) GetTags() map[string]string { +func (dp *DataPoint[T]) Tags() []Topic { return dp.tags } diff --git a/db/src/point/message.go b/db/src/point/message.go deleted file mode 100644 index 568fba0..0000000 --- a/db/src/point/message.go +++ /dev/null @@ -1,54 +0,0 @@ -package datapoint - -import ( - "strings" - "time" -) - -// MessageInfo is an interface for accessing all fields of a Message. -type MessageInfo interface { - Topic() string - TopicParts() []string - Timestamp() time.Time - PayloadAsAny() map[string]any -} - -// Message represents an MQTT message with an ordered topic path. -type Message[T any] struct { - topicParts []string - payload map[string]T - timestamp time.Time -} - -// CreateMessage creates a new Message with the given topic parts, payload and timestamp. -func CreateMessage[T any](topicParts []string, payload map[string]T, timestamp time.Time) Message[T] { - return Message[T]{ - topicParts: topicParts, - payload: payload, - timestamp: timestamp, - } -} - -func (m *Message[T]) Topic() string { - return strings.Join(m.topicParts, "/") -} - -func (m *Message[T]) TopicParts() []string { - return m.topicParts -} - -func (m *Message[T]) Payload() map[string]T { - return m.payload -} - -func (m *Message[T]) Timestamp() time.Time { - return m.timestamp -} - -func (m *Message[T]) PayloadAsAny() map[string]any { - anyPayload := make(map[string]any, len(m.payload)) - for k, v := range m.payload { - anyPayload[k] = v - } - return anyPayload -}