feat(db): add mqtt gateway from previous project

Co-authored-by: Aydong <coudray@nathan.ch>
Signed-off-by: Klagarge <remi@heredero.ch>
Signed-off-by: Aydong <coudray@nathan.ch>
This commit is contained in:
2026-04-30 10:39:05 +02:00
parent 5a1bfefffb
commit 567e9162e2
2 changed files with 233 additions and 0 deletions

179
db/src/mqtt/mqtt_gateway.go Normal file
View File

@@ -0,0 +1,179 @@
// Package mqtt_gateway provides an abstraction to an MQTT broker client.
package mqtt_gateway
import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
data "gateway/point"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// A MqttParams is the abstracted MQTT gateway.
// It provides the MQTT parameters to initialize the connection and the method to add data.
type MqttParams struct {
Broker string
ClientId string
Qos byte
Username string
Password string
TlsConfig *tls.Config
OnConnect mqtt.OnConnectHandler
OnConnectionLost mqtt.ConnectionLostHandler
Timeout time.Duration
}
// MqttGateway is the abstracted MQTT gateway.
// It connects to the MQTT broker and provides the method to send data to the broker.
type MqttGateway struct {
MqttParams MqttParams
Client mqtt.Client
}
// mqttPayload is the JSON structure published to the broker in specific topic.
// It contains the values and the timestamp of the data.
type mqttPayload map[string]any
// connectHandler is called when the client connects to the broker. It prints a message to the console.
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Println("[MQTT Gateway] Connected to MQTT Broker")
}
// connectLostHandler is called when the client loses connection to the broker. It prints a message to the console with the error.
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Printf("[MQTT Gateway] Connection lost: %v\n", err)
}
// NewMqttGateway creates a new MqttGateway with the given parameters.
// And establishes the connection
func NewMqttGateway(p MqttParams) (*MqttGateway, error) {
// Verify input variable
if p.Broker == "" {
return nil, errors.New("[MQTT Gateway] Invalid broker address")
}
if p.ClientId == "" {
return nil, errors.New("[MQTT Gateway] Invalid client id")
}
if p.Qos > 2 {
return nil, errors.New("[MQTT Gateway] Invalid QoS level")
}
if p.Timeout == 0 {
// Set to default value
p.Timeout = time.Second * 5
}
opts := mqtt.NewClientOptions()
opts.AddBroker(p.Broker)
opts.SetClientID(p.ClientId)
if p.TlsConfig != nil {
opts.SetTLSConfig(p.TlsConfig)
}
if p.OnConnect != nil {
opts.SetOnConnectHandler(p.OnConnect)
} else {
opts.SetOnConnectHandler(connectHandler)
}
if p.OnConnectionLost != nil {
opts.SetConnectionLostHandler(p.OnConnectionLost)
} else {
opts.SetConnectionLostHandler(connectLostHandler)
}
if p.Username != "" {
opts.SetUsername(p.Username)
opts.SetPassword(p.Password)
}
client := mqtt.NewClient(opts)
token := client.Connect()
if !token.WaitTimeout(p.Timeout) {
return nil, fmt.Errorf("[MQTT Gateway] Mqtt connect timed out")
}
if err := token.Error(); err != nil {
return nil, fmt.Errorf("[MQTT Gateway] Mqtt connect failed: %w", err)
}
return &MqttGateway{
MqttParams: p,
Client: client,
}, nil
}
// SendData is used to send a data in the MQTT gateway.
// It uses the DataPointInfo interface for abstracting the generic type of the DataPoint
func (g *MqttGateway) SendData(msg data.MessageInfo) error {
topic := msg.Topic()
if topic == "" {
return errors.New("[MQTT Gateway] Invalid topic")
}
payload := mqttPayload{
"timestamp": msg.Timestamp().Unix(),
}
for key, value := range msg.PayloadAsAny() {
payload[key] = value
}
payloadJson, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("[MQTT Gateway] Failed to marshal payload: %w", err)
}
token := g.Client.Publish(topic, g.MqttParams.Qos, false, payloadJson)
if !token.WaitTimeout(g.MqttParams.Timeout) {
return fmt.Errorf("[MQTT Gateway] Mqtt connect timed out")
}
if token.Error() != nil {
return fmt.Errorf("[MQTT Gateway] Failed to publish message: %w", token.Error())
}
return nil
}
// Disconnect is used to disconnect the MQTT gateway from the broker.
// It prints a message to the console when the disconnection is successful.
func (g *MqttGateway) Disconnect() {
g.Client.Disconnect(0)
log.Println("[MQTT Gateway] Disconnected from MQTT Broker")
}
// Subscribe is used to subscribe to a topic in the MQTT gateway.
// It takes a topic and a callback function as parameters.
// The callback function is called when a message is received on the subscribed topic.
func (g *MqttGateway) Subscribe(topic string, callback mqtt.MessageHandler) error {
token := g.Client.Subscribe(topic, g.MqttParams.Qos, callback)
if !token.WaitTimeout(g.MqttParams.Timeout) {
return fmt.Errorf("[MQTT Gateway] MQTT gateway timed out")
}
if token.Error() != nil {
return fmt.Errorf("[MQTT Gateway] MQTT gateway failed to subscribe: %w", token.Error())
}
log.Printf("[MQTT Gateway] Subscribed to topic: %s\n", topic)
return nil
}
// Unsubscribe is used to unsubscribe from a topic in the MQTT gateway.
func (g *MqttGateway) Unsubscribe(topic string) error {
token := g.Client.Unsubscribe(topic)
if !token.WaitTimeout(g.MqttParams.Timeout) {
return fmt.Errorf("[MQTT Gateway] MQTT gateway timed out")
}
if token.Error() != nil {
return fmt.Errorf("[MQTT Gateway] MQTT gateway failed to unsubscribe: %w", token.Error())
}
log.Printf("[MQTT Gateway] Unsubscribed from topic: %s\n", topic)
return nil
}

54
db/src/point/message.go Normal file
View File

@@ -0,0 +1,54 @@
package datapoint
import (
"strings"
"time"
)
// MessageInfo is an interface for accessing all fields of a Message.
type MessageInfo interface {
Topic() string
TopicParts() []string
Timestamp() time.Time
PayloadAsAny() map[string]any
}
// Message represents an MQTT message with an ordered topic path.
type Message[T any] struct {
topicParts []string
payload map[string]T
timestamp time.Time
}
// CreateMessage creates a new Message with the given topic parts, payload and timestamp.
func CreateMessage[T any](topicParts []string, payload map[string]T, timestamp time.Time) Message[T] {
return Message[T]{
topicParts: topicParts,
payload: payload,
timestamp: timestamp,
}
}
func (m *Message[T]) Topic() string {
return strings.Join(m.topicParts, "/")
}
func (m *Message[T]) TopicParts() []string {
return m.topicParts
}
func (m *Message[T]) Payload() map[string]T {
return m.payload
}
func (m *Message[T]) Timestamp() time.Time {
return m.timestamp
}
func (m *Message[T]) PayloadAsAny() map[string]any {
anyPayload := make(map[string]any, len(m.payload))
for k, v := range m.payload {
anyPayload[k] = v
}
return anyPayload
}