feat(db): add influx gateway from previous project
Co-authored-by: fastium <fastium.pro@proton.me> Signed-off-by: Klagarge <remi@heredero.ch> Signed-off-by: fastium <fastium.pro@proton.me>
This commit is contained in:
98
db/src/influx/influx_gateway.go
Normal file
98
db/src/influx/influx_gateway.go
Normal file
@@ -0,0 +1,98 @@
|
||||
// Package influx_gateway_Lib provides an abstraction to the client influx.
|
||||
package influx_gateway
|
||||
|
||||
import (
|
||||
"context"
|
||||
datapoint "gateway/point"
|
||||
|
||||
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
|
||||
"github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching"
|
||||
)
|
||||
|
||||
// Number of points to batch before pushing to InfluxDB
|
||||
const BATCH_SIZE = 50
|
||||
|
||||
// Capacity maximum of the buffer
|
||||
const BATCH_CAPACITY = 50
|
||||
|
||||
// Gateway provides the abstraction of all gateway to send datapoints
|
||||
type Gateway interface {
|
||||
AddDatapoint(dp datapoint.DataPointInfo) error
|
||||
Close() error
|
||||
Flush() error
|
||||
}
|
||||
|
||||
// An InfluxGateway is the abstracted influx gateway.
|
||||
// It provides the influx parameters to initialize the connection
|
||||
// and the batcher to batch the points before pushing to Influx.
|
||||
type InfluxGateway struct {
|
||||
client *influxdb3.Client
|
||||
batcher *batching.Batcher
|
||||
}
|
||||
|
||||
// NewInfluxGateway creates a new InfluxGateway with the given parameters.
|
||||
func NewInfluxGateway(url string, token string, database string) (*InfluxGateway, error) {
|
||||
|
||||
client, err := influxdb3.New(influxdb3.ClientConfig{
|
||||
Host: url,
|
||||
Token: token,
|
||||
Database: database,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &InfluxGateway{
|
||||
client: client,
|
||||
batcher: batching.NewBatcher(
|
||||
batching.WithSize(BATCH_SIZE),
|
||||
batching.WithInitialCapacity(BATCH_CAPACITY),
|
||||
),
|
||||
},
|
||||
nil
|
||||
}
|
||||
|
||||
// AddDatapoint is used to add a datapoint in the batcher. It uses the
|
||||
// DataPointInfo interface for abstracting the generic type of the DataPoint.
|
||||
// It pushs the batch of point when the number of point >= batch size.
|
||||
func (g *InfluxGateway) AddDatapoint(dp datapoint.DataPointInfo) error {
|
||||
|
||||
g.batcher.Add(
|
||||
influxdb3.NewPoint(
|
||||
dp.GetMeasurementName(),
|
||||
dp.GetTags(),
|
||||
dp.GetValuesAsAny(),
|
||||
dp.GetTimestamp(),
|
||||
),
|
||||
)
|
||||
|
||||
// check if the number of point >= batchsize
|
||||
if g.batcher.Ready() {
|
||||
// Send batch to influx DB
|
||||
err := g.Flush()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Flush sends the current batch of points to InfluxDB.
|
||||
func (g *InfluxGateway) Flush() error {
|
||||
// Send batch to influx DB
|
||||
err := g.client.WritePoints(context.Background(), g.batcher.Emit())
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the InfluxGateway client.
|
||||
func (g *InfluxGateway) Close() error {
|
||||
return g.client.Close()
|
||||
}
|
||||
73
db/src/point/datapoint.go
Normal file
73
db/src/point/datapoint.go
Normal file
@@ -0,0 +1,73 @@
|
||||
// Package datapoint implements measurement and datapoint
|
||||
// for database gateways.
|
||||
package datapoint
|
||||
|
||||
import "time"
|
||||
|
||||
// DataPointInfo provides an interface for accessing
|
||||
// all fields of a DataPoint.
|
||||
type DataPointInfo interface {
|
||||
GetMeasurementName() string
|
||||
GetTags() map[string]string
|
||||
GetValuesAsAny() map[string]any
|
||||
GetTimestamp() time.Time
|
||||
}
|
||||
|
||||
// A Measurement represents a type of measurement such as
|
||||
// temperature, humidity, ...
|
||||
type Measurement[T any] struct {
|
||||
name string
|
||||
}
|
||||
|
||||
// A DataPoint represents values associated with a measurement, along with tags,
|
||||
// values and a timestamp. It contains a pointer to its parent Measurement
|
||||
type DataPoint[T any] struct {
|
||||
measurement *Measurement[T]
|
||||
tags map[string]string
|
||||
values map[string]T
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
func CreateMeasurement[T any](name string) Measurement[T] {
|
||||
return Measurement[T]{
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
// CreateDataPoint forces the creation of DataPoint with the type according
|
||||
// to the generic.
|
||||
func (m *Measurement[T]) CreateDataPoint(tags map[string]string, values map[string]T, timestamp time.Time) DataPoint[T] {
|
||||
return DataPoint[T]{
|
||||
measurement: m,
|
||||
tags: tags,
|
||||
values: values,
|
||||
timestamp: timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
func (dp *DataPoint[T]) GetMeasurementName() string {
|
||||
return dp.measurement.name
|
||||
}
|
||||
|
||||
func (dp *DataPoint[T]) GetTags() map[string]string {
|
||||
return dp.tags
|
||||
}
|
||||
|
||||
func (dp *DataPoint[T]) GetValues() map[string]T {
|
||||
return dp.values
|
||||
}
|
||||
|
||||
// GetValuesAny returns the DataPoint value with type any.
|
||||
// (usefull for adding the DataPoint for batching)
|
||||
func (dp *DataPoint[T]) GetValuesAsAny() map[string]any {
|
||||
anyValues := make(map[string]any)
|
||||
|
||||
for k, v := range dp.values {
|
||||
anyValues[k] = v
|
||||
}
|
||||
return anyValues
|
||||
}
|
||||
|
||||
func (dp *DataPoint[T]) GetTimestamp() time.Time {
|
||||
return dp.timestamp
|
||||
}
|
||||
Reference in New Issue
Block a user