diff --git a/db/src/influx/influx_gateway.go b/db/src/influx/influx_gateway.go new file mode 100644 index 0000000..c6440cd --- /dev/null +++ b/db/src/influx/influx_gateway.go @@ -0,0 +1,98 @@ +// Package influx_gateway_Lib provides an abstraction to the client influx. +package influx_gateway + +import ( + "context" + datapoint "gateway/point" + + "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3/batching" +) + +// Number of points to batch before pushing to InfluxDB +const BATCH_SIZE = 50 + +// Capacity maximum of the buffer +const BATCH_CAPACITY = 50 + +// Gateway provides the abstraction of all gateway to send datapoints +type Gateway interface { + AddDatapoint(dp datapoint.DataPointInfo) error + Close() error + Flush() error +} + +// An InfluxGateway is the abstracted influx gateway. +// It provides the influx parameters to initialize the connection +// and the batcher to batch the points before pushing to Influx. +type InfluxGateway struct { + client *influxdb3.Client + batcher *batching.Batcher +} + +// NewInfluxGateway creates a new InfluxGateway with the given parameters. +func NewInfluxGateway(url string, token string, database string) (*InfluxGateway, error) { + + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + + if err != nil { + return nil, err + } + + return &InfluxGateway{ + client: client, + batcher: batching.NewBatcher( + batching.WithSize(BATCH_SIZE), + batching.WithInitialCapacity(BATCH_CAPACITY), + ), + }, + nil +} + +// AddDatapoint is used to add a datapoint in the batcher. It uses the +// DataPointInfo interface for abstracting the generic type of the DataPoint. +// It pushs the batch of point when the number of point >= batch size. +func (g *InfluxGateway) AddDatapoint(dp datapoint.DataPointInfo) error { + + g.batcher.Add( + influxdb3.NewPoint( + dp.GetMeasurementName(), + dp.GetTags(), + dp.GetValuesAsAny(), + dp.GetTimestamp(), + ), + ) + + // check if the number of point >= batchsize + if g.batcher.Ready() { + // Send batch to influx DB + err := g.Flush() + + if err != nil { + return err + } + + } + return nil +} + +// Flush sends the current batch of points to InfluxDB. +func (g *InfluxGateway) Flush() error { + // Send batch to influx DB + err := g.client.WritePoints(context.Background(), g.batcher.Emit()) + + if err != nil { + return err + } + + return nil +} + +// Close closes the InfluxGateway client. +func (g *InfluxGateway) Close() error { + return g.client.Close() +} diff --git a/db/src/point/datapoint.go b/db/src/point/datapoint.go new file mode 100644 index 0000000..bebd6a8 --- /dev/null +++ b/db/src/point/datapoint.go @@ -0,0 +1,73 @@ +// Package datapoint implements measurement and datapoint +// for database gateways. +package datapoint + +import "time" + +// DataPointInfo provides an interface for accessing +// all fields of a DataPoint. +type DataPointInfo interface { + GetMeasurementName() string + GetTags() map[string]string + GetValuesAsAny() map[string]any + GetTimestamp() time.Time +} + +// A Measurement represents a type of measurement such as +// temperature, humidity, ... +type Measurement[T any] struct { + name string +} + +// A DataPoint represents values associated with a measurement, along with tags, +// values and a timestamp. It contains a pointer to its parent Measurement +type DataPoint[T any] struct { + measurement *Measurement[T] + tags map[string]string + values map[string]T + timestamp time.Time +} + +func CreateMeasurement[T any](name string) Measurement[T] { + return Measurement[T]{ + name: name, + } +} + +// CreateDataPoint forces the creation of DataPoint with the type according +// to the generic. +func (m *Measurement[T]) CreateDataPoint(tags map[string]string, values map[string]T, timestamp time.Time) DataPoint[T] { + return DataPoint[T]{ + measurement: m, + tags: tags, + values: values, + timestamp: timestamp, + } +} + +func (dp *DataPoint[T]) GetMeasurementName() string { + return dp.measurement.name +} + +func (dp *DataPoint[T]) GetTags() map[string]string { + return dp.tags +} + +func (dp *DataPoint[T]) GetValues() map[string]T { + return dp.values +} + +// GetValuesAny returns the DataPoint value with type any. +// (usefull for adding the DataPoint for batching) +func (dp *DataPoint[T]) GetValuesAsAny() map[string]any { + anyValues := make(map[string]any) + + for k, v := range dp.values { + anyValues[k] = v + } + return anyValues +} + +func (dp *DataPoint[T]) GetTimestamp() time.Time { + return dp.timestamp +}