feat(db): add rest gateway
- Implement GET rooms list endpoint only Signed-off-by: Klagarge <remi@heredero.ch>
This commit is contained in:
@@ -1,33 +1,58 @@
|
||||
module gateway
|
||||
|
||||
go 1.24.0
|
||||
go 1.25.0
|
||||
|
||||
require (
|
||||
github.com/InfluxCommunity/influxdb3-go/v2 v2.13.0
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.1
|
||||
github.com/gin-gonic/gin v1.12.0
|
||||
github.com/stretchr/testify v1.11.1
|
||||
go.uber.org/mock v0.6.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/apache/arrow-go/v18 v18.5.1 // indirect
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.15.0 // indirect
|
||||
github.com/bytedance/sonic/loader v0.5.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.12 // indirect
|
||||
github.com/gin-contrib/sse v1.1.0 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.30.1 // indirect
|
||||
github.com/goccy/go-json v0.10.5 // indirect
|
||||
github.com/goccy/go-yaml v1.19.2 // indirect
|
||||
github.com/google/flatbuffers v25.12.19+incompatible // indirect
|
||||
github.com/gorilla/websocket v1.5.3 // indirect
|
||||
github.com/influxdata/line-protocol/v2 v2.2.1 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.18.2 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.23 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/quic-go/qpack v0.6.0 // indirect
|
||||
github.com/quic-go/quic-go v0.59.0 // indirect
|
||||
github.com/rogpeppe/go-internal v1.14.1 // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/ugorji/go/codec v1.3.1 // indirect
|
||||
github.com/zeebo/xxh3 v1.0.2 // indirect
|
||||
go.mongodb.org/mongo-driver/v2 v2.5.0 // indirect
|
||||
golang.org/x/arch v0.22.0 // indirect
|
||||
golang.org/x/crypto v0.48.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect
|
||||
golang.org/x/mod v0.32.0 // indirect
|
||||
golang.org/x/net v0.49.0 // indirect
|
||||
golang.org/x/net v0.51.0 // indirect
|
||||
golang.org/x/sync v0.19.0 // indirect
|
||||
golang.org/x/sys v0.40.0 // indirect
|
||||
golang.org/x/sys v0.41.0 // indirect
|
||||
golang.org/x/telemetry v0.0.0-20260109210033-bd525da824e2 // indirect
|
||||
golang.org/x/text v0.33.0 // indirect
|
||||
golang.org/x/text v0.34.0 // indirect
|
||||
golang.org/x/tools v0.41.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
// Package influx_gateway_Lib provides an abstraction to the client influx.
|
||||
// Package influx provides an abstraction to the client influx.
|
||||
package influx
|
||||
|
||||
import (
|
||||
@@ -17,6 +17,7 @@ type Gateway interface {
|
||||
AddDatapoint(dp datapoint.DataPointInfo) error
|
||||
Close() error
|
||||
Flush() error
|
||||
Query(ctx context.Context, query string) (*influxdb3.QueryIterator, error)
|
||||
}
|
||||
|
||||
// An InfluxGateway is the abstracted influx gateway.
|
||||
@@ -96,3 +97,8 @@ func (g *InfluxGateway) Flush() error {
|
||||
func (g *InfluxGateway) Close() error {
|
||||
return g.client.Close()
|
||||
}
|
||||
|
||||
// Query executes a SQL query against InfluxDB using the Arrow Flight (gRPC) API.
|
||||
func (g *InfluxGateway) Query(ctx context.Context, query string) (*influxdb3.QueryIterator, error) {
|
||||
return g.client.Query(ctx, query)
|
||||
}
|
||||
|
||||
@@ -5,8 +5,10 @@ import (
|
||||
"gateway/influx"
|
||||
"gateway/mqtt"
|
||||
point "gateway/point"
|
||||
"gateway/rest"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -57,7 +59,7 @@ func mqttConnection() *mqtt.MqttGateway {
|
||||
func influxConnection() *influx.InfluxGateway {
|
||||
influxUrl, ok := os.LookupEnv("INFLUX_URL")
|
||||
if !ok {
|
||||
influxUrl = "https://db.e.kb28.ch:443"
|
||||
influxUrl = "http://db.e.kb28.ch:8181"
|
||||
}
|
||||
influxDatabase, ok := os.LookupEnv("INFLUX_DATABASE")
|
||||
if !ok {
|
||||
@@ -69,7 +71,7 @@ func influxConnection() *influx.InfluxGateway {
|
||||
influxToken = "password"
|
||||
}
|
||||
|
||||
log.Printf("[Main] InfluxDB config: URL=%s, DB=%s, Org=%s, Timeout=%v\n", influxUrl, influxDatabase)
|
||||
log.Printf("[Main] InfluxDB config: URL=%s, DB=%s\n", influxUrl, influxDatabase)
|
||||
|
||||
// Create the gateway
|
||||
gateway, err := influx.NewInfluxGateway(influxUrl, influxToken, influxDatabase)
|
||||
@@ -105,6 +107,18 @@ func main() {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Keep the application running to receive messages
|
||||
// Initialize and start REST Gateway
|
||||
restGateway := rest.NewRestGateway(influxGateway)
|
||||
|
||||
port, ok := os.LookupEnv("REST_PORT")
|
||||
if !ok {
|
||||
port = "8080"
|
||||
}
|
||||
|
||||
log.Printf("[Main] Starting REST Gateway on port %s\n", port)
|
||||
if err := restGateway.Run(":" + port); err != nil {
|
||||
log.Fatalf("[Main] Failed to start REST Gateway: %v", err)
|
||||
}
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
63
db/src/rest/rest.go
Normal file
63
db/src/rest/rest.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package rest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"gateway/influx"
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type RestGateway struct {
|
||||
influxGateway *influx.InfluxGateway
|
||||
engine *gin.Engine
|
||||
}
|
||||
|
||||
func NewRestGateway(influxGateway *influx.InfluxGateway) *RestGateway {
|
||||
g := &RestGateway{
|
||||
influxGateway: influxGateway,
|
||||
engine: gin.Default(),
|
||||
}
|
||||
|
||||
g.setupRoutes()
|
||||
return g
|
||||
}
|
||||
|
||||
func (g *RestGateway) setupRoutes() {
|
||||
v1 := g.engine.Group("/api/v1")
|
||||
{
|
||||
v1.GET("/rooms", g.getRooms)
|
||||
}
|
||||
}
|
||||
|
||||
func (g *RestGateway) Run(addr string) error {
|
||||
return g.engine.Run(addr)
|
||||
}
|
||||
|
||||
// GET /api/v1/rooms
|
||||
func (g *RestGateway) getRooms(c *gin.Context) {
|
||||
// Query unique rooms from the "provence" measurement
|
||||
query := `SELECT DISTINCT("room") FROM "provence"`
|
||||
|
||||
// Using context.Background() as seen in working snippet
|
||||
it, err := g.influxGateway.Query(context.Background(), query)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
var rooms []string
|
||||
for it.Next() {
|
||||
val := it.Value()
|
||||
if room, ok := val["room"].(string); ok {
|
||||
rooms = append(rooms, room)
|
||||
}
|
||||
}
|
||||
|
||||
if err := it.Err(); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, rooms)
|
||||
}
|
||||
Reference in New Issue
Block a user