From 567e9162e29c3594bf5d6e3e7a6cfa426f103035 Mon Sep 17 00:00:00 2001 From: Klagarge Date: Thu, 30 Apr 2026 10:39:05 +0200 Subject: [PATCH] feat(db): add mqtt gateway from previous project Co-authored-by: Aydong Signed-off-by: Klagarge Signed-off-by: Aydong --- db/src/mqtt/mqtt_gateway.go | 179 ++++++++++++++++++++++++++++++++++++ db/src/point/message.go | 54 +++++++++++ 2 files changed, 233 insertions(+) create mode 100644 db/src/mqtt/mqtt_gateway.go create mode 100644 db/src/point/message.go diff --git a/db/src/mqtt/mqtt_gateway.go b/db/src/mqtt/mqtt_gateway.go new file mode 100644 index 0000000..fe497d9 --- /dev/null +++ b/db/src/mqtt/mqtt_gateway.go @@ -0,0 +1,179 @@ +// Package mqtt_gateway provides an abstraction to an MQTT broker client. +package mqtt_gateway + +import ( + "crypto/tls" + "encoding/json" + "errors" + "fmt" + data "gateway/point" + "log" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// A MqttParams is the abstracted MQTT gateway. +// It provides the MQTT parameters to initialize the connection and the method to add data. +type MqttParams struct { + Broker string + ClientId string + Qos byte + Username string + Password string + TlsConfig *tls.Config + OnConnect mqtt.OnConnectHandler + OnConnectionLost mqtt.ConnectionLostHandler + Timeout time.Duration +} + +// MqttGateway is the abstracted MQTT gateway. +// It connects to the MQTT broker and provides the method to send data to the broker. +type MqttGateway struct { + MqttParams MqttParams + Client mqtt.Client +} + +// mqttPayload is the JSON structure published to the broker in specific topic. +// It contains the values and the timestamp of the data. +type mqttPayload map[string]any + +// connectHandler is called when the client connects to the broker. It prints a message to the console. +var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { + log.Println("[MQTT Gateway] Connected to MQTT Broker") +} + +// connectLostHandler is called when the client loses connection to the broker. It prints a message to the console with the error. +var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { + log.Printf("[MQTT Gateway] Connection lost: %v\n", err) +} + +// NewMqttGateway creates a new MqttGateway with the given parameters. +// And establishes the connection +func NewMqttGateway(p MqttParams) (*MqttGateway, error) { + // Verify input variable + if p.Broker == "" { + return nil, errors.New("[MQTT Gateway] Invalid broker address") + } + + if p.ClientId == "" { + return nil, errors.New("[MQTT Gateway] Invalid client id") + } + + if p.Qos > 2 { + return nil, errors.New("[MQTT Gateway] Invalid QoS level") + } + + if p.Timeout == 0 { + // Set to default value + p.Timeout = time.Second * 5 + } + + opts := mqtt.NewClientOptions() + opts.AddBroker(p.Broker) + opts.SetClientID(p.ClientId) + + if p.TlsConfig != nil { + opts.SetTLSConfig(p.TlsConfig) + } + + if p.OnConnect != nil { + opts.SetOnConnectHandler(p.OnConnect) + } else { + opts.SetOnConnectHandler(connectHandler) + } + + if p.OnConnectionLost != nil { + opts.SetConnectionLostHandler(p.OnConnectionLost) + } else { + opts.SetConnectionLostHandler(connectLostHandler) + } + + if p.Username != "" { + opts.SetUsername(p.Username) + opts.SetPassword(p.Password) + } + + client := mqtt.NewClient(opts) + token := client.Connect() + if !token.WaitTimeout(p.Timeout) { + return nil, fmt.Errorf("[MQTT Gateway] Mqtt connect timed out") + } + if err := token.Error(); err != nil { + return nil, fmt.Errorf("[MQTT Gateway] Mqtt connect failed: %w", err) + } + + return &MqttGateway{ + MqttParams: p, + Client: client, + }, nil +} + +// SendData is used to send a data in the MQTT gateway. +// It uses the DataPointInfo interface for abstracting the generic type of the DataPoint +func (g *MqttGateway) SendData(msg data.MessageInfo) error { + topic := msg.Topic() + if topic == "" { + return errors.New("[MQTT Gateway] Invalid topic") + } + + payload := mqttPayload{ + "timestamp": msg.Timestamp().Unix(), + } + + for key, value := range msg.PayloadAsAny() { + payload[key] = value + } + + payloadJson, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("[MQTT Gateway] Failed to marshal payload: %w", err) + } + + token := g.Client.Publish(topic, g.MqttParams.Qos, false, payloadJson) + if !token.WaitTimeout(g.MqttParams.Timeout) { + return fmt.Errorf("[MQTT Gateway] Mqtt connect timed out") + } + if token.Error() != nil { + return fmt.Errorf("[MQTT Gateway] Failed to publish message: %w", token.Error()) + } + + return nil +} + +// Disconnect is used to disconnect the MQTT gateway from the broker. +// It prints a message to the console when the disconnection is successful. +func (g *MqttGateway) Disconnect() { + g.Client.Disconnect(0) + log.Println("[MQTT Gateway] Disconnected from MQTT Broker") +} + +// Subscribe is used to subscribe to a topic in the MQTT gateway. +// It takes a topic and a callback function as parameters. +// The callback function is called when a message is received on the subscribed topic. +func (g *MqttGateway) Subscribe(topic string, callback mqtt.MessageHandler) error { + token := g.Client.Subscribe(topic, g.MqttParams.Qos, callback) + if !token.WaitTimeout(g.MqttParams.Timeout) { + return fmt.Errorf("[MQTT Gateway] MQTT gateway timed out") + } + if token.Error() != nil { + return fmt.Errorf("[MQTT Gateway] MQTT gateway failed to subscribe: %w", token.Error()) + } + + log.Printf("[MQTT Gateway] Subscribed to topic: %s\n", topic) + return nil +} + +// Unsubscribe is used to unsubscribe from a topic in the MQTT gateway. +func (g *MqttGateway) Unsubscribe(topic string) error { + token := g.Client.Unsubscribe(topic) + if !token.WaitTimeout(g.MqttParams.Timeout) { + return fmt.Errorf("[MQTT Gateway] MQTT gateway timed out") + } + if token.Error() != nil { + return fmt.Errorf("[MQTT Gateway] MQTT gateway failed to unsubscribe: %w", token.Error()) + } + + log.Printf("[MQTT Gateway] Unsubscribed from topic: %s\n", topic) + return nil +} diff --git a/db/src/point/message.go b/db/src/point/message.go new file mode 100644 index 0000000..568fba0 --- /dev/null +++ b/db/src/point/message.go @@ -0,0 +1,54 @@ +package datapoint + +import ( + "strings" + "time" +) + +// MessageInfo is an interface for accessing all fields of a Message. +type MessageInfo interface { + Topic() string + TopicParts() []string + Timestamp() time.Time + PayloadAsAny() map[string]any +} + +// Message represents an MQTT message with an ordered topic path. +type Message[T any] struct { + topicParts []string + payload map[string]T + timestamp time.Time +} + +// CreateMessage creates a new Message with the given topic parts, payload and timestamp. +func CreateMessage[T any](topicParts []string, payload map[string]T, timestamp time.Time) Message[T] { + return Message[T]{ + topicParts: topicParts, + payload: payload, + timestamp: timestamp, + } +} + +func (m *Message[T]) Topic() string { + return strings.Join(m.topicParts, "/") +} + +func (m *Message[T]) TopicParts() []string { + return m.topicParts +} + +func (m *Message[T]) Payload() map[string]T { + return m.payload +} + +func (m *Message[T]) Timestamp() time.Time { + return m.timestamp +} + +func (m *Message[T]) PayloadAsAny() map[string]any { + anyPayload := make(map[string]any, len(m.payload)) + for k, v := range m.payload { + anyPayload[k] = v + } + return anyPayload +}